You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/09/23 20:17:24 UTC

svn commit: r1174935 - in /pig/branches/branch-0.9: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: daijy
Date: Fri Sep 23 18:17:24 2011
New Revision: 1174935

URL: http://svn.apache.org/viewvc?rev=1174935&view=rev
Log:
PIG-2237: LIMIT generates wrong number of records if pig determines no of reducers as more than 1

Added:
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Fri Sep 23 18:17:24 2011
@@ -38,6 +38,8 @@ PIG-2221: Couldnt find documentation for
 
 BUG FIXES
 
+PIG-2237: LIMIT generates wrong number of records if pig determines no of reducers as more than 1 (daijy)
+
 PIG-2261: Restore support for parenthesis in Pig 0.9 (rding via daijy)
 
 PIG-2238: Pig 0.9 error message not useful as compared to 0.8 (daijy)

Added: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java?rev=1174935&view=auto
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java (added)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LimitAdjuster.java Fri Sep 23 18:17:24 2011
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+
+public class LimitAdjuster extends MROpPlanVisitor {
+    ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();  
+    PigContext pigContext;
+    NodeIdGenerator nig;
+    private String scope;
+
+
+    public LimitAdjuster(MROperPlan plan, PigContext pigContext) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        this.pigContext = pigContext;
+        nig = NodeIdGenerator.getGenerator();
+        List<MapReduceOper> roots = plan.getRoots();
+        scope = roots.get(0).getOperatorKey().getScope();
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        // Look for map reduce operators which contains limit operator.
+        // If so and the requestedParallelism > 1, add one additional map-reduce
+        // operator with 1 reducer into the original plan
+        if (mr.limit!=-1 && mr.requestedParallelism!=1)
+        {
+            opsToAdjust.add(mr);
+        }
+    }
+    
+    public void adjust() throws IOException, PlanException
+    {
+        for (MapReduceOper mr:opsToAdjust)
+        {
+            if (mr.reducePlan.isEmpty()) continue;
+            List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
+            if (mpLeaves.size() != 1) {
+                int errCode = 2024; 
+                String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
+                throw new MRCompilerException(msg, errCode, PigException.BUG);
+            }
+            PhysicalOperator mpLeaf = mpLeaves.get(0);
+            if (!pigContext.inIllustrator) {
+                if (!(mpLeaf instanceof POStore)) {
+                    int errCode = 2025;
+                    String msg = "Expected leaf of reduce plan to " +
+                        "always be POStore. Found " + mpLeaf.getClass().getSimpleName();
+                    throw new MRCompilerException(msg, errCode, PigException.BUG);
+                }
+            }
+            FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
+            boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
+            
+            FileSpec fSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
+                    new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
+            POStore storeOp = (POStore) mpLeaf;
+            storeOp.setSFile(fSpec);
+            storeOp.setIsTmpStore(true);
+            mr.setReduceDone(true);
+            MapReduceOper limitAdjustMROp = new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            ld.setPc(pigContext);
+            ld.setLFile(fSpec);
+            limitAdjustMROp.mapPlan.add(ld);
+            if (mr.isGlobalSort()) {
+                connectMapToReduceLimitedSort(limitAdjustMROp, mr);
+            } else {
+                MRUtil.simpleConnectMapToReduce(limitAdjustMROp, scope, nig);
+            }
+            // Need to split the original reduce plan into two mapreduce job:
+            // 1st: From the root(POPackage) to POLimit
+            // 2nd: From POLimit to leaves(POStore), duplicate POLimit
+            // The reason for doing that:
+            // 1. We need to have two map-reduce job, otherwise, we will end up with
+            //    N*M records, N is number of reducer, M is limit constant. We need 
+            //    one extra mapreduce job with 1 reducer
+            // 2. We don't want to move operator after POLimit into the first mapreduce
+            //    job, because:
+            //    * Foreach will shift the key type for second mapreduce job, see PIG-461
+            //    * Foreach flatten may generating more than M records, which get cut 
+            //      by POLimit, see PIG-2231
+            splitReducerForLimit(limitAdjustMROp, mr);
+
+            if (mr.isGlobalSort()) 
+            {
+                limitAdjustMROp.setLimitAfterSort(true);
+                limitAdjustMROp.setSortOrder(mr.getSortOrder());
+            }
+            
+            POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            st.setSFile(oldSpec);
+            st.setIsTmpStore(oldIsTmpStore);
+            st.setSchema(((POStore)mpLeaf).getSchema());
+            limitAdjustMROp.reducePlan.addAsLeaf(st);
+            limitAdjustMROp.requestedParallelism = 1;
+            limitAdjustMROp.setLimitOnly(true);
+            
+            List<MapReduceOper> successorList = mPlan.getSuccessors(mr);
+            MapReduceOper successors[] = null;
+            
+            // Save a snapshot for successors, since we will modify MRPlan, 
+            // use the list directly will be problematic
+            if (successorList!=null && successorList.size()>0)
+            {
+                successors = new MapReduceOper[successorList.size()];
+                int i=0;
+                for (MapReduceOper op:successorList)
+                    successors[i++] = op;
+            }
+            
+            // Process UDFs
+            for (String udf : mr.UDFs) {
+                if (!limitAdjustMROp.UDFs.contains(udf)) {
+                    limitAdjustMROp.UDFs.add(udf);
+                }
+            }
+            
+            mPlan.add(limitAdjustMROp);
+            mPlan.connect(mr, limitAdjustMROp);
+            
+            if (successors!=null)
+            {
+                for (int i=0;i<successors.length;i++)
+                {
+                    MapReduceOper nextMr = successors[i];
+                    if (nextMr!=null)
+                        mPlan.disconnect(mr, nextMr);
+                    
+                    if (nextMr!=null)
+                        mPlan.connect(limitAdjustMROp, nextMr);                        
+                }
+            }
+        }
+    }
+    
+    // Move all operators between POLimit and POStore in reducer plan 
+    // from firstMROp to the secondMROp
+    private void splitReducerForLimit(MapReduceOper secondMROp,
+            MapReduceOper firstMROp) throws PlanException, VisitorException {
+                    
+        PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
+        assert(op instanceof POPackage);
+        
+        while (true) {
+            List<PhysicalOperator> succs = firstMROp.reducePlan
+                    .getSuccessors(op);
+            if (succs==null) break;
+            op = succs.get(0);
+            if (op instanceof POLimit) {
+                // find operator after POLimit
+                op = firstMROp.reducePlan.getSuccessors(op).get(0);
+                break;
+            }
+        }
+        
+        POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pLimit2.setLimit(firstMROp.limit);
+        secondMROp.reducePlan.addAsLeaf(pLimit2);
+
+        while (true) {
+            if (op instanceof POStore) break;
+            PhysicalOperator opToMove = op;
+            List<PhysicalOperator> succs = firstMROp.reducePlan
+                    .getSuccessors(op);
+            op = succs.get(0);
+            
+            firstMROp.reducePlan.removeAndReconnect(opToMove);
+            secondMROp.reducePlan.addAsLeaf(opToMove);
+            
+        }
+    }
+    
+    private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
+    {
+        POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
+        
+        POLocalRearrange lr = null;
+        try {
+            lr = slr.clone();
+        } catch (CloneNotSupportedException e) {
+            int errCode = 2147;
+            String msg = "Error cloning POLocalRearrange for limit after sort";
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        
+        mro.mapPlan.addAsLeaf(lr);
+        
+        POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
+
+        POPackage pkg = null;
+        try {
+            pkg = spkg.clone();
+        } catch (Exception e) {
+            int errCode = 2148;
+            String msg = "Error cloning POPackageLite for limit after sort";
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        mro.reducePlan.add(pkg);
+        mro.reducePlan.addAsLeaf(MRUtil.getPlainForEachOP(scope, nig));
+    }
+}
\ No newline at end of file

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Sep 23 18:17:24 2011
@@ -342,10 +342,6 @@ public class MRCompiler extends PhyPlanV
         
         connectSoftLink();
         
-        LimitAdjuster la = new LimitAdjuster(MRPlan);
-        la.visit();
-        la.adjust();
-        
         return MRPlan;
     }
     
@@ -938,89 +934,6 @@ public class MRCompiler extends PhyPlanV
         }
     }
     
-    private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
-    {
-        POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
-        
-        POLocalRearrange lr = null;
-        try {
-            lr = slr.clone();
-        } catch (CloneNotSupportedException e) {
-            int errCode = 2147;
-            String msg = "Error cloning POLocalRearrange for limit after sort";
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
-        }
-        
-        mro.mapPlan.addAsLeaf(lr);
-        
-        POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
-
-        POPackage pkg = null;
-        try {
-            pkg = spkg.clone();
-        } catch (Exception e) {
-            int errCode = 2148;
-            String msg = "Error cloning POPackageLite for limit after sort";
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
-        }
-        mro.reducePlan.add(pkg);
-        mro.reducePlan.addAsLeaf(getPlainForEachOP());
-    }
-    
-    private void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
-    {
-        PhysicalPlan ep = new PhysicalPlan();
-        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prjStar.setResultType(DataType.TUPLE);
-        prjStar.setStar(true);
-        ep.add(prjStar);
-        
-        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-        eps.add(ep);
-        
-        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        try {
-            lr.setIndex(0);
-        } catch (ExecException e) {
-            int errCode = 2058;
-            String msg = "Unable to set index on the newly created POLocalRearrange.";
-            throw new PlanException(msg, errCode, PigException.BUG, e);
-        }
-        lr.setKeyType(DataType.TUPLE);
-        lr.setPlans(eps);
-        lr.setResultType(DataType.TUPLE);
-        
-        mro.mapPlan.addAsLeaf(lr);
-        
-        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType(DataType.TUPLE);
-        pkg.setNumInps(1);
-        boolean[] inner = {false};
-        pkg.setInner(inner);
-        mro.reducePlan.add(pkg);
-        
-        mro.reducePlan.addAsLeaf(getPlainForEachOP());
-    }
-    
-    private POForEach getPlainForEachOP()
-    {
-        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-        List<Boolean> flat1 = new ArrayList<Boolean>();
-        PhysicalPlan ep1 = new PhysicalPlan();
-        POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prj1.setResultType(DataType.TUPLE);
-        prj1.setStar(false);
-        prj1.setColumn(1);
-        prj1.setOverloaded(true);
-        ep1.add(prj1);
-        eps1.add(ep1);
-        flat1.add(true);
-        POForEach fe = new POForEach(new OperatorKey(scope, nig
-                .getNextNodeId(scope)), -1, eps1, flat1);
-        fe.setResultType(DataType.BAG);
-        return fe;
-    }
-    
     @Override
     public void visitLimit(POLimit op) throws VisitorException{
         try{
@@ -1038,7 +951,7 @@ public class MRCompiler extends PhyPlanV
                 
                 if (mro.reducePlan.isEmpty())
                 {
-                    simpleConnectMapToReduce(mro);
+                    MRUtil.simpleConnectMapToReduce(mro, scope, nig);
                     mro.requestedParallelism = 1;
                     if (!pigContext.inIllustrator) {
                         POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -1557,7 +1470,7 @@ public class MRCompiler extends PhyPlanV
         
         // After getting an index entry in each mapper, send all of them to one 
         // reducer where they will be sorted on the way by Hadoop.
-        simpleConnectMapToReduce(indexerMROp);
+        MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
         
         indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
         
@@ -1762,7 +1675,7 @@ public class MRCompiler extends PhyPlanV
                 // Loader of mro will return a tuple of form - 
                 // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
 
-                simpleConnectMapToReduce(rightMROpr);
+                MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
                 rightMROpr.useTypedComparator(true);
                 
                 POStore st = getStore();
@@ -2831,164 +2744,6 @@ public class MRCompiler extends PhyPlanV
 
     }
     
-    private class LimitAdjuster extends MROpPlanVisitor {
-        ArrayList<MapReduceOper> opsToAdjust = new ArrayList<MapReduceOper>();  
-
-        LimitAdjuster(MROperPlan plan) {
-            super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
-        }
-
-        @Override
-        public void visitMROp(MapReduceOper mr) throws VisitorException {
-            // Look for map reduce operators which contains limit operator.
-            // If so and the requestedParallelism > 1, add one additional map-reduce
-            // operator with 1 reducer into the original plan
-            if (mr.limit!=-1 && mr.requestedParallelism!=1)
-            {
-                opsToAdjust.add(mr);
-            }
-        }
-        
-        public void adjust() throws IOException, PlanException
-        {
-            for (MapReduceOper mr:opsToAdjust)
-            {
-                if (mr.reducePlan.isEmpty()) continue;
-                List<PhysicalOperator> mpLeaves = mr.reducePlan.getLeaves();
-                if (mpLeaves.size() != 1) {
-                    int errCode = 2024; 
-                    String msg = "Expected reduce to have single leaf. Found " + mpLeaves.size() + " leaves.";
-                    throw new MRCompilerException(msg, errCode, PigException.BUG);
-                }
-                PhysicalOperator mpLeaf = mpLeaves.get(0);
-                if (!pigContext.inIllustrator)
-                if (!(mpLeaf instanceof POStore)) {
-                    int errCode = 2025;
-                    String msg = "Expected leaf of reduce plan to " +
-                        "always be POStore. Found " + mpLeaf.getClass().getSimpleName();
-                    throw new MRCompilerException(msg, errCode, PigException.BUG);
-                }
-                FileSpec oldSpec = ((POStore)mpLeaf).getSFile();
-                boolean oldIsTmpStore = ((POStore)mpLeaf).isTmpStore();
-                
-                FileSpec fSpec = getTempFileSpec();
-                ((POStore)mpLeaf).setSFile(fSpec);
-                ((POStore)mpLeaf).setIsTmpStore(true);
-                mr.setReduceDone(true);
-                MapReduceOper limitAdjustMROp = getMROp();
-                POLoad ld = getLoad();
-                ld.setLFile(fSpec);
-                limitAdjustMROp.mapPlan.add(ld);
-                if (mr.isGlobalSort()) {
-                    connectMapToReduceLimitedSort(limitAdjustMROp, mr);
-                } else {
-                    simpleConnectMapToReduce(limitAdjustMROp);
-                }
-                
-                // Need to split the original reduce plan into two mapreduce job:
-                // 1st: From the root(POPackage) to POLimit
-                // 2nd: From POLimit to leaves(POStore), duplicate POLimit
-                // The reason for doing that:
-                // 1. We need to have two map-reduce job, otherwise, we will end up with
-                //    N*M records, N is number of reducer, M is limit constant. We need
-                //    one extra mapreduce job with 1 reducer
-                // 2. We don't want to move operator after POLimit into the first mapreduce
-                //    job, because:
-                //    * Foreach will shift the key type for second mapreduce job, see PIG-461
-                //    * Foreach flatten may generating more than M records, which get cut
-                //      by POLimit, see PIG-2231
-                splitReducerForLimit(limitAdjustMROp, mr);
-
-                if (mr.isGlobalSort()) 
-                {
-                    limitAdjustMROp.setLimitAfterSort(true);
-                    limitAdjustMROp.setSortOrder(mr.getSortOrder());
-                }
-                
-                POStore st = getStore();
-                st.setSFile(oldSpec);
-                st.setIsTmpStore(oldIsTmpStore);
-                st.setSchema(((POStore)mpLeaf).getSchema());
-                limitAdjustMROp.reducePlan.addAsLeaf(st);
-                limitAdjustMROp.requestedParallelism = 1;
-                limitAdjustMROp.setLimitOnly(true);
-                
-                List<MapReduceOper> successorList = MRPlan.getSuccessors(mr);
-                MapReduceOper successors[] = null;
-                
-                // Save a snapshot for successors, since we will modify MRPlan, 
-                // use the list directly will be problematic
-                if (successorList!=null && successorList.size()>0)
-                {
-                    successors = new MapReduceOper[successorList.size()];
-                    int i=0;
-                    for (MapReduceOper op:successorList)
-                        successors[i++] = op;
-                }
-                
-                // Process UDFs
-                for (String udf : mr.UDFs) {
-                    if (!limitAdjustMROp.UDFs.contains(udf)) {
-                        limitAdjustMROp.UDFs.add(udf);
-                    }
-                }
-                
-                MRPlan.add(limitAdjustMROp);
-                MRPlan.connect(mr, limitAdjustMROp);
-                
-                if (successors!=null)
-                {
-                    for (int i=0;i<successors.length;i++)
-                    {
-                        MapReduceOper nextMr = successors[i];
-                        if (nextMr!=null)
-                            MRPlan.disconnect(mr, nextMr);
-                        
-                        if (nextMr!=null)
-                            MRPlan.connect(limitAdjustMROp, nextMr);                        
-                    }
-                }
-            }
-        }
-        
-        // Move all operators between POLimit and POStore in reducer plan 
-        // from firstMROp to the secondMROp
-        private void splitReducerForLimit(MapReduceOper secondMROp,
-                MapReduceOper firstMROp) throws PlanException, VisitorException {
-                        
-            PhysicalOperator op = firstMROp.reducePlan.getRoots().get(0);
-            assert(op instanceof POPackage);
-            
-            while (true) {
-                List<PhysicalOperator> succs = firstMROp.reducePlan
-                        .getSuccessors(op);
-                if (succs==null) break;
-                op = succs.get(0);
-                if (op instanceof POLimit) {
-                    // find operator after POLimit
-                    op = firstMROp.reducePlan.getSuccessors(op).get(0);
-                    break;
-                }
-            }
-            
-            POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            pLimit2.setLimit(firstMROp.limit);
-            secondMROp.reducePlan.addAsLeaf(pLimit2);
-
-            while (true) {
-                if (op instanceof POStore) break;
-                PhysicalOperator opToMove = op;
-                List<PhysicalOperator> succs = firstMROp.reducePlan
-                        .getSuccessors(op);
-                op = succs.get(0);
-                
-                firstMROp.reducePlan.removeAndReconnect(opToMove);
-                secondMROp.reducePlan.addAsLeaf(opToMove);
-                
-            }
-        }
-    }
-
     private static class FindKeyTypeVisitor extends PhyPlanVisitor {
 
         byte keyType = DataType.UNKNOWN;

Added: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java?rev=1174935&view=auto
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java (added)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java Fri Sep 23 18:17:24 2011
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+public class MRUtil {
+    // simpleConnectMapToReduce is a utility to end a map phase and start a reduce phase in
+    //     a mapreduce operator:
+    // 1. mro only contains map plan
+    // 2. need to add POLocalRearrange to end map plan, and add 
+    //    POPackage to start a reduce plan
+    // 3. POLocalRearrange/POPackage are trivial
+    static public void simpleConnectMapToReduce(MapReduceOper mro, String scope, NodeIdGenerator nig) throws PlanException
+    {
+        PhysicalPlan ep = new PhysicalPlan();
+        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar.setResultType(DataType.TUPLE);
+        prjStar.setStar(true);
+        ep.add(prjStar);
+        
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        eps.add(ep);
+        
+        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on the newly created POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType(DataType.TUPLE);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        
+        mro.mapPlan.addAsLeaf(lr);
+        
+        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pkg.setKeyType(DataType.TUPLE);
+        pkg.setNumInps(1);
+        boolean[] inner = {false};
+        pkg.setInner(inner);
+        mro.reducePlan.add(pkg);
+        
+        mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));
+    }
+    
+    // Get a simple POForEach: ForEach X generate flatten($1)
+    static public POForEach getPlainForEachOP(String scope, NodeIdGenerator nig)
+    {
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+        List<Boolean> flat1 = new ArrayList<Boolean>();
+        PhysicalPlan ep1 = new PhysicalPlan();
+        POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prj1.setResultType(DataType.TUPLE);
+        prj1.setStar(false);
+        prj1.setColumn(1);
+        prj1.setOverloaded(true);
+        ep1.add(prj1);
+        eps1.add(ep1);
+        flat1.add(true);
+        POForEach fe = new POForEach(new OperatorKey(scope, nig
+                .getNextNodeId(scope)), -1, eps1, flat1);
+        fe.setResultType(DataType.BAG);
+        return fe;
+    }
+}

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Sep 23 18:17:24 2011
@@ -501,6 +501,10 @@ public class MapReduceLauncher extends L
         SampleOptimizer so = new SampleOptimizer(plan, pc);
         so.visit();
         
+        LimitAdjuster la = new LimitAdjuster(plan, pc);
+        la.visit();
+        la.adjust();
+        
         // Optimize to use secondary sort key if possible
         prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
         if (!pc.inIllustrator && !("true".equals(prop)))  {

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestEvalPipeline2.java Fri Sep 23 18:17:24 2011
@@ -1620,4 +1620,34 @@ public class TestEvalPipeline2 {
         
         Assert.assertFalse(iter.hasNext());
     }
+    
+    // See PIG-2237
+    @Test
+    public void testLimitAutoReducer() throws Exception{
+        String[] input = {
+                "1\tA",
+                "4\tB",
+                "2\tC",
+                "3\tD",
+                "6\tE",
+                "5\tF"
+        };
+        
+        Util.createInputFile(cluster, "table_testLimitAutoReducer", input);
+        
+        pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "9");
+        pigServer.registerQuery("A = load 'table_testLimitAutoReducer' as (a0, a1);");
+        pigServer.registerQuery("B = order A by a0;");
+        pigServer.registerQuery("C = limit B 2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        Tuple t = iter.next();
+        Assert.assertTrue(t.toString().equals("(1,A)"));
+        
+        t = iter.next();
+        Assert.assertTrue(t.toString().equals("(2,C)"));
+        
+        Assert.assertFalse(iter.hasNext());
+    }
 }

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java?rev=1174935&r1=1174934&r2=1174935&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestMRCompiler.java Fri Sep 23 18:17:24 2011
@@ -44,6 +44,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -901,6 +902,11 @@ public class TestMRCompiler extends juni
     	
     	PhysicalPlan pp = Util.buildPp(pigServerMR, query);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+    	
+    	LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
+        la.visit();
+        la.adjust();
+
     	MapReduceOper mrOper = mrPlan.getRoots().get(0);
     	int count = 1;
     	
@@ -995,6 +1001,11 @@ public class TestMRCompiler extends juni
          
         PhysicalPlan pp = Util.buildPp(pigServerMR, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
+        la.visit();
+        la.adjust();
+        
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
         int count = 1;