You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2017/04/12 17:39:35 UTC

svn commit: r1791166 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/util/ src/org/apache/pig/data/ src/org/apache/pig/newplan/logical/optimizer/...

Author: daijy
Date: Wed Apr 12 17:39:34 2017
New Revision: 1791166

URL: http://svn.apache.org/viewvc?rev=1791166&view=rev
Log:
PIG-5211: Optimize Nested Limited Sort

Added:
    pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/trunk/src/org/apache/pig/data/BagFactory.java
    pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
    pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java
    pig/trunk/test/org/apache/pig/test/TestDataBag.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/trunk/test/org/apache/pig/test/TestPOSort.java
    pig/trunk/test/org/apache/pig/test/TestSecondarySort.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 12 17:39:34 2017
@@ -36,6 +36,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-5211: Optimize Nested Limited Sort (jins via daijy)
+
 PIG-5214: search any substring in the input string (rainer-46 via daijy)
 
 PIG-5210: Option to print MR/Tez plan before launching (ly16 via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Apr 12 17:39:34 2017
@@ -267,10 +267,15 @@ public class POSort extends PhysicalOper
                     }
                 }
             }
-			// by default, we create InternalSortedBag, unless user configures
-            // explicitly to use old bag
-            sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
-                    : new InternalSortedBag(3, mComparator);
+
+            if (isLimited()) {
+                sortedBag = mBagFactory.newLimitedSortedBag(mComparator, limit);
+            } else {
+                // by default, we create InternalSortedBag, unless user configures
+                // explicitly to use old bag
+	            sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
+	                    : new InternalSortedBag(3, mComparator);
+            }
 
             while (inp.returnStatus != POStatus.STATUS_EOP) {
 				if (inp.returnStatus == POStatus.STATUS_ERR) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Wed Apr 12 17:39:34 2017
@@ -569,6 +569,12 @@ public class SecondaryKeyOptimizerUtil {
 
         // We see POSort, check which key it is using
         public boolean processSort(POSort sort) throws FrontendException{
+
+            // if sort has a limit, it is already optimized by NestedLimitedSort optimizer
+            if (sort.isLimited()) {
+                return true;
+            }
+
             SortKeyInfo keyInfo = new SortKeyInfo();
             for (int i = 0; i < sort.getSortPlans().size(); i++) {
                 PhysicalPlan sortPlan = sort.getSortPlans().get(i);

Modified: pig/trunk/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BagFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/BagFactory.java Wed Apr 12 17:39:34 2017
@@ -98,7 +98,17 @@ public abstract class BagFactory {
      * @return default data bag.
      */
     public abstract DataBag newDefaultBag(List<Tuple> listOfTuples);
-    
+
+    /**
+     * Get a limited sorted data bag.  Limited sorted bags are sorted bags
+     * with number of elements no more than limit.
+     * @param comp Comparator that controls how the data is sorted.
+     * If null, default comparator will be used.
+     * @param limit max number of tuples in bag
+     * @return a sorted data bag
+     */
+    public abstract DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit);
+
     /**
      * Get a sorted data bag.  Sorted bags guarantee that when an iterator
      * is opened on the bag the tuples will be returned in sorted order.

Modified: pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java Wed Apr 12 17:39:34 2017
@@ -46,6 +46,17 @@ public class DefaultBagFactory extends B
     }
 
     /**
+     * Get a limited sorted data bag.
+     * @param comp Comparator that controls how the data is sorted.
+     * If null, default comparator will be used.
+     */
+    @Override
+    public DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit) {
+        DataBag b = new LimitedSortedDataBag(comp, limit);
+        return b;
+    }
+
+    /**
      * Get a sorted data bag.
      * @param comp Comparator that controls how the data is sorted.
      * If null, default comparator will be used.

Added: pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java (added)
+++ pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples.  Data is
+ * stored in a priority queue as it comes in, and only sorted when iterator is requested.
+ *
+ * LimitedSortedDataBag is not spillable.
+ *
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
+ */
+public class LimitedSortedDataBag implements DataBag {
+
+    private static final Log log = LogFactory.getLog(LimitedSortedDataBag.class);
+    private static final long serialVersionUID = 1L;
+
+    private final Comparator<Tuple> mComp;
+	private final PriorityQueue<Tuple> priorityQ;
+	private final long limit;
+
+	/**
+	 * @param comp Comparator to use to do the sorting.
+	 * If null, DefaultComparator will be used.
+	 */
+	public LimitedSortedDataBag(Comparator<Tuple> comp, long limit) {
+	    this.mComp = comp == null ? new DefaultComparator() : comp;
+	    this.limit = limit;
+	    this.priorityQ = new PriorityQueue<Tuple>(
+	            (int)limit, getReversedComparator(mComp));
+	}
+
+	/**
+     * Get the number of elements in the bag in memory.
+     * @return number of elements in the bag
+     */
+	@Override
+	public long size() {
+	   return priorityQ.size();
+	}
+
+	/**
+     * Find out if the bag is sorted.
+     * @return true if this is a sorted data bag, false otherwise.
+     */
+	@Override
+	public boolean isSorted() {
+	    return true;
+	}
+
+	/**
+     * Find out if the bag is distinct.
+     * @return true if the bag is a distinct bag, false otherwise.
+     */
+	@Override
+	public boolean isDistinct() {
+	    return false;
+	}
+
+	/**
+     * Get an iterator to the bag. For default and distinct bags,
+     * no particular order is guaranteed. For sorted bags the order
+     * is guaranteed to be sorted according
+     * to the provided comparator.
+     * @return tuple iterator
+     */
+	@Override
+	public Iterator<Tuple> iterator() {
+	    return new LimitedSortedDataBagIterator();
+	}
+
+	/**
+     * Add a tuple to the bag.
+     * @param t tuple to add.
+     */
+    @Override
+    public void add(Tuple t) {
+        priorityQ.add(t);
+        if (priorityQ.size() > limit) {
+            priorityQ.poll();
+        }
+    }
+
+    /**
+     * Add contents of a bag to the bag.
+     * @param b bag to add contents of.
+     */
+    @Override
+    public void addAll(DataBag b) {
+        Iterator<Tuple> it = b.iterator();
+        while(it.hasNext()) {
+            add(it.next());
+        }
+    }
+
+    /**
+     * Clear out the contents of the bag, both on disk and in memory.
+     * Any attempts to read after this is called will produce undefined
+     * results.
+     */
+    @Override
+    public void clear() {
+        priorityQ.clear();
+    }
+
+    /**
+     * Write a bag's contents to disk.
+     * @param out DataOutput to write data to.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    public void write(DataOutput out) throws IOException {
+        // We don't care whether this bag was sorted or distinct because
+        // using the iterator to write it will guarantee those things come
+        // correctly.  And on the other end there'll be no reason to waste
+        // time re-sorting or re-applying distinct.
+        out.writeLong(size());
+        Iterator<Tuple> it = iterator();
+        while (it.hasNext()) {
+            Tuple item = it.next();
+            item.write(out);
+        }
+    }
+
+    /**
+     * Read a bag from disk.
+     * @param in DataInput to read data from.
+     * @throws IOException (passes it on from underlying calls).
+     */
+    public void readFields(DataInput in) throws IOException {
+        long size = in.readLong();
+
+        for (long i = 0; i < size; i++) {
+            try {
+                Object o = DataReaderWriter.readDatum(in);
+                add((Tuple)o);
+            } catch (ExecException ee) {
+                throw ee;
+            }
+        }
+    }
+
+    /**
+     * Write the bag into a string.
+     */
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append('{');
+        Iterator<Tuple> it = iterator();
+        while ( it.hasNext() ) {
+            Tuple t = it.next();
+            String s = t.toString();
+            sb.append(s);
+            if (it.hasNext()) sb.append(",");
+        }
+        sb.append('}');
+        return sb.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compareTo(Object other) {
+        if (this == other)
+            return 0;
+        if (other instanceof DataBag) {
+            DataBag bOther = (DataBag) other;
+            if (this.size() != bOther.size()) {
+                if (this.size() > bOther.size()) return 1;
+                else return -1;
+            }
+
+            // if we got this far, both bags should have same size
+            // make a LimitedSortedBag for the other bag with same comparator and limit
+            // so that both bag are sorted and we can loop through both iterators
+            DataBag otherCloneDataBag = new LimitedSortedDataBag(mComp, limit);
+            otherCloneDataBag.addAll((DataBag) other);
+
+            Iterator<Tuple> thisIt = this.iterator();
+            Iterator<Tuple> otherIt = otherCloneDataBag.iterator();
+
+            while (thisIt.hasNext() && otherIt.hasNext()) {
+                Tuple thisT = thisIt.next();
+                Tuple otherT = otherIt.next();
+
+                int c = thisT.compareTo(otherT);
+                if (c != 0) return c;
+            }
+            return 0;   // if we got this far, they must be equal
+        } else {
+            return DataType.compare(this, other);
+        }
+    }
+
+    /**
+     * Not implemented.
+     * This is used by FuncEvalSpec.FakeDataBag.
+     * @param stale Set stale state.
+     */
+    @Override
+    public void markStale(boolean stale) {
+        throw new RuntimeException("LimitedSortedDataBag cannot be marked stale");
+    }
+
+    /**
+     * Not implemented.
+     */
+    @Override
+    public long spill() {
+        return 0;
+    }
+
+    /**
+     * Not implemented.
+     */
+    @Override
+    public long getMemorySize() {
+        return 0;
+    }
+
+    private static class DefaultComparator implements Comparator<Tuple> {
+        @Override
+        @SuppressWarnings("unchecked")
+        public int compare(Tuple t1, Tuple t2) {
+            return t1.compareTo(t2);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return 42;
+        }
+    }
+
+   /**
+    * Since comparator in Java 1.7 does not have reversed(),
+    * we need this method to get a reversed comparator for a given comparator
+    * @param comp Comparator to reverse
+    * @return reversed comparator
+    */
+    private <T> Comparator<T> getReversedComparator(final Comparator<T> comp) {
+
+        return new Comparator<T>() {
+
+            @Override
+            public int compare(T o1, T o2) {
+                return -comp.compare(o1, o2);
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                return comp.equals(o);
+            }
+        };
+    }
+
+    /**
+     * An iterator that handles getting the next tuple from the bag.
+     * Since priority queue iterator does not return elements in any order
+     * we need to dump elements in a List, sort them, and return iterator of the List
+     */
+    private class LimitedSortedDataBagIterator implements Iterator<Tuple> {
+
+        private int mCntr;
+        private final List<Tuple> mContents;
+
+        public LimitedSortedDataBagIterator() {
+            mCntr = 0;
+            mContents = new ArrayList<>(priorityQ);
+            Collections.sort(mContents, mComp);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return (mCntr < mContents.size());
+        }
+
+        @Override
+        public Tuple next() {
+            // This will report progress every 1024 times through next.
+            // This should be much faster than using mod.
+            if ((mCntr & 0x3ff) == 0) reportProgress();
+
+            return mContents.get(mCntr++);
+        }
+
+        /**
+         * Not implemented.
+         */
+        public void remove() {
+            throw new RuntimeException("Cannot remove() from LimitedSortedDataBag.iterator()");
+        }
+    }
+
+    /**
+     * Report progress to HDFS.
+     */
+    protected void reportProgress() {
+        if (PhysicalOperator.getReporter() != null) {
+            PhysicalOperator.getReporter().progress();
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Apr 12 17:39:34 2017
@@ -40,6 +40,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.logical.rules.NestedLimitOptimizer;
 import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
 import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer;
 import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
@@ -202,6 +203,15 @@ public class LogicalPlanOptimizer extend
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
+
+        // Nested Limit Set
+        // This set of rules push up nested limit
+        s = new HashSet<Rule>();
+        // Optimize limit
+        r = new NestedLimitOptimizer("NestedLimitOptimizer");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
 
         return ls;
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Apr 12 17:39:34 2017
@@ -259,6 +259,7 @@ public class LogicalPlan extends BaseOpe
             disabledOptimizerRules.add("MergeForEach");
             disabledOptimizerRules.add("PartitionFilterOptimizer");
             disabledOptimizerRules.add("LimitOptimizer");
+            disabledOptimizerRules.add("NestedLimitOptimizer");
             disabledOptimizerRules.add("SplitFilter");
             disabledOptimizerRules.add("PushUpFilter");
             disabledOptimizerRules.add("MergeFilter");

Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class NestedLimitOptimizer extends Rule {
+
+    public NestedLimitOptimizer(String name) {
+        super(name, false);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator forEach = new LOForEach(plan);
+        plan.add(forEach);
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new OptimizeNestedLimitTransformer();
+    }
+
+    public class OptimizeNestedLimitTransformer extends Transformer {
+
+        @Override
+        public boolean check(OperatorPlan matched) {
+
+            LOForEach forEach = (LOForEach) matched.getSources().get(0);
+            LogicalPlan innerPlan = forEach.getInnerPlan();
+
+            // check if there is a LOSort immediately followed by LOLimit in innerPlan
+            Iterator<Operator> it = innerPlan.getOperators();
+
+            while(it.hasNext()) {
+                Operator op = it.next();
+                if (op instanceof LOLimit) {
+                    List<Operator> preds = innerPlan.getPredecessors(op);
+                    // Limit should always have exactly 1 predecessor
+                    if (preds.get(0) instanceof LOSort) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return currentPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+
+            LOForEach forEach = (LOForEach) matched.getSources().get(0);
+            LogicalPlan innerPlan = forEach.getInnerPlan();
+
+            // Get LOSort and LOLimit from innerPlan
+            Iterator<Operator> it = innerPlan.getOperators();
+
+            LOLimit limit = null;
+            LOSort sort = null;
+            while(it.hasNext()) {
+                Operator op = it.next();
+                if (op instanceof LOLimit) {
+                    List<Operator> preds = innerPlan.getPredecessors(op);
+                    // Limit should always have exactly 1 predecessor
+                    if (preds.get(0) instanceof LOSort) {
+                        limit = (LOLimit) op;
+                        sort = (LOSort) (preds.get(0));
+                        break;
+                    }
+                }
+            }
+
+            // set limit in LOSort, and remove LOLimit
+            sort.setLimit(limit.getLimit());
+            innerPlan.removeAndReconnect(limit);
+        }
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java (original)
+++ pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java Wed Apr 12 17:39:34 2017
@@ -33,6 +33,7 @@ public class NonDefaultBagFactory extend
     public DataBag newDefaultBag(List<Tuple> listOfTuples) {
         return null;
     }
+    public DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit) { return null; }
     public DataBag newSortedBag(Comparator<Tuple> comp) { return null; }
     public DataBag newDistinctBag() { return null; }
 

Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Wed Apr 12 17:39:34 2017
@@ -34,6 +34,7 @@ import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.TreeSet;
 
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DefaultDataBag;
@@ -42,6 +43,7 @@ import org.apache.pig.data.DistinctDataB
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.InternalDistinctBag;
 import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.LimitedSortedDataBag;
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.SortedDataBag;
@@ -741,10 +743,13 @@ public class TestDataBag  {
 
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
+        DataBag limitedSorted = f.newLimitedSortedBag(null, 1);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
+        assertTrue("Expected a limited sorted bag",
+                (limitedSorted instanceof LimitedSortedDataBag));
         assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
     }
 
@@ -1231,7 +1236,42 @@ public class TestDataBag  {
         assertFalse(iter.hasNext());
         assertFalse("hasNext should be idempotent", iter.hasNext());
     }
-}
-
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testLimitedSortedBag() throws ExecException {
+        DataBag bag = new LimitedSortedDataBag(null, 2);
+        Tuple t;
+        t = TupleFactory.getInstance().newTuple(1);
+        t.set(0, 2);
+        bag.add(t);
+        t = TupleFactory.getInstance().newTuple(1);
+        t.set(0, 0);
+        bag.add(t);
+        t = TupleFactory.getInstance().newTuple(1);
+        t.set(0, 1);
+        bag.add(t);
 
+        // test size()
+        assertEquals(bag.size(), 2);
+        // test isSorted()
+        assertTrue(bag.isSorted());
+        // test isDistinct()
+        assertFalse(bag.isDistinct());
+        // test iterator()
+        Iterator<Tuple> it = bag.iterator();
+        assertEquals(it.next().get(0), 0);
+        assertEquals(it.next().get(0), 1);
+        assertEquals(it.hasNext(), false);
+        // test addAll()
+        DataBag bag1 = new LimitedSortedDataBag(null, 1);
+        bag1.addAll(bag);
+        assertEquals(bag1.size(), 1);
+        // test compareTo()
+        assertEquals(bag.compareTo(bag), 0);
+        assertEquals(bag.compareTo(bag1), 1);
+        // test clear()
+        bag1.clear();
+        assertEquals(bag1.size(), 0);
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Apr 12 17:39:34 2017
@@ -35,10 +35,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.StringTokenizer;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
@@ -73,8 +72,10 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
+import junit.framework.Assert;
+
 public class TestEvalPipelineLocal {
-    
+
     private PigServer pigServer;
 
     static final int MAX_SIZE = 100000;
@@ -1289,4 +1290,38 @@ public class TestEvalPipelineLocal {
 
         Assert.assertTrue(bos.toString().contains("New For Each(false,false)[tuple]"));
     }
+
+    @Test
+    public void testNestedLimitedSort() throws Exception {
+
+        File f1 = createFile(new String[]{
+                "katie carson\t25\t3.65",
+                "katie carson\t65\t0.73",
+                "katie carson\t57\t2.43",
+                "katie carson\t55\t3.77",
+                "holly white\t43\t0.24"});
+
+        String query = "a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) +
+                       "' as (name:chararray,age:int, gpa:double);" +
+                       "b = group a by name;" +
+                       "c = foreach b {" +
+                       "c1 = a.(age, gpa);" +
+                       "c2 = order c1 by age;" +
+                       "c3 = limit c2 3;" +
+                       "generate c3;}";
+
+        pigServer.registerQuery(query);
+        Iterator<Tuple> iter = pigServer.openIterator("c");
+
+        Set<String> expectedResultSet = new HashSet<>();
+        expectedResultSet.add("({(25,3.65),(55,3.77),(57,2.43)})");
+        expectedResultSet.add("({(43,0.24)})");
+
+        Set<String> resultSet = new HashSet<>();
+        resultSet.add(iter.next().toString());
+        resultSet.add(iter.next().toString());
+
+        assertTrue(resultSet.equals(expectedResultSet));
+        assertFalse(iter.hasNext());
+    }
 }

Added: pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.NestedLimitOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestOptimizeNestedLimit {
+
+    LogicalPlan lp;
+    PhysicalPlan php;
+    MROperPlan mrp;
+
+    @Before
+    public void setup() throws Exception {
+
+        PigContext pc = new PigContext( ExecType.LOCAL, new Properties());
+        PigServer pigServer = new PigServer(pc);
+        String query = "A = load 'myfile';" +
+                "B = group A by $0;" +
+                "C = foreach B {" +
+                "C1 = order A by $1;" +
+                "C2 = limit C1 5;" +
+                "generate C2;}" +
+                "store C into 'empty';";
+        lp = optimizePlan(Util.buildLp(pigServer, query));
+        php = Util.buildPp(pigServer, query);
+        mrp = Util.buildMRPlanWithOptimizer(php, pc);
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    // Test logical plan
+    public void testLogicalPlan() throws Exception {
+
+        Iterator<Operator> it = lp.getOperators();
+
+        LOForEach forEach = null;
+        while(it.hasNext()) {
+            Operator op = it.next();
+            if (op instanceof LOForEach) {
+                assertNull("There should be only one LOForEach", forEach);
+                forEach = (LOForEach) op;
+            }
+        }
+        assertNotNull("ForEach is missing", forEach);
+
+        it = forEach.getInnerPlan().getOperators();
+        LOSort sort = null;
+        while(it.hasNext()) {
+            Operator op = it.next();
+            if (op instanceof LOLimit) {
+                fail("There should be no LOLimit");
+            } else if (op instanceof LOSort) {
+                assertNull("There should be only one LOSort", sort);
+                sort = (LOSort) op;
+            }
+        }
+        assertNotNull("LOSort is missing", sort);
+        assertEquals(sort.getLimit(), 5);
+    }
+
+    @Test
+    // test physical plan
+    public void testPhysicalPlan() throws Exception {
+
+        Iterator<PhysicalOperator> it = php.iterator();
+
+        POForEach forEach = null;
+        while (it.hasNext()) {
+            PhysicalOperator op = it.next();
+            if (op instanceof POForEach) {
+                assertNull("There should be only 1 POForEach", forEach);
+                forEach = (POForEach) op;
+            }
+        }
+        assertNotNull("POForEach is missing", forEach);
+
+        List<PhysicalPlan> inps = forEach.getInputPlans();
+        assertEquals(inps.size(), 1);
+
+        it = inps.get(0).iterator();
+        POSort sort = null;
+        while(it.hasNext()) {
+            PhysicalOperator op = it.next();
+            if (op instanceof POLimit) {
+                fail("There should be no POLimit");
+            } else if (op instanceof POSort) {
+                assertNull("There should be only 1 POSort", sort);
+                sort = (POSort) op;
+            }
+        }
+        assertNotNull("POSort is missing", sort);
+        assertEquals(sort.getLimit(), 5);
+    }
+
+    @Test
+    // test MR plan
+    public void testMRPlan() throws Exception {
+
+        List<MapReduceOper> ops = mrp.getLeaves();
+        assertEquals(ops.size(), 1);
+
+        PhysicalPlan plan = ops.get(0).reducePlan;
+        Iterator<PhysicalOperator> it = plan.iterator();
+
+        POForEach forEach = null;
+        while(it.hasNext()) {
+            PhysicalOperator op = it.next();
+            if (op instanceof POForEach) {
+                assertNull("There should be only 1 POForEach", forEach);
+                forEach = (POForEach) op;
+            }
+        }
+        assertNotNull("POForEach is missing", forEach);
+
+        List<PhysicalPlan> inps = forEach.getInputPlans();
+        assertEquals(inps.size(), 1);
+
+        it = inps.get(0).iterator();
+        POSort sort = null;
+        while(it.hasNext()) {
+            PhysicalOperator op = it.next();
+            if (op instanceof POLimit) {
+                fail("There should be no POLimit");
+            } else if (op instanceof POSort) {
+                assertNull("There should be only 1 POSort", sort);
+                sort = (POSort) op;
+            }
+        }
+        assertNotNull("POSort is missing", sort);
+        assertEquals(sort.getLimit(), 5);
+    }
+
+    public class MyPlanOptimizer extends LogicalPlanOptimizer {
+        protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
+            super( p, iterations, new HashSet<String>() );
+        }
+
+        @Override
+        protected List<Set<Rule>> buildRuleSets() {
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+            Set<Rule> s = null;
+            Rule r = null;
+
+            s = new HashSet<Rule>();
+            r = new NestedLimitOptimizer("OptimizeNestedLimit");
+            s.add(r);
+            ls.add(s);
+
+            return ls;
+        }
+    }
+
+    private LogicalPlan optimizePlan(LogicalPlan plan) throws IOException {
+        PlanOptimizer optimizer = new MyPlanOptimizer( plan, 3 );
+        optimizer.optimize();
+        return plan;
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOSort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOSort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOSort.java Wed Apr 12 17:39:34 2017
@@ -593,6 +593,62 @@ public class TestPOSort {
     }
 
     @Test
+    public void testPOSortWithLimit() throws ExecException {
+
+        DataBag input = DefaultBagFactory.getInstance().newDefaultBag();
+
+        Tuple t1 = TupleFactory.getInstance().newTuple() ;
+        t1.append(1);
+        t1.append(8);
+        input.add(t1);
+
+        Tuple t2 = TupleFactory.getInstance().newTuple() ;
+        t2.append(2);
+        t2.append(8);
+        input.add(t2);
+
+        Tuple t3 = TupleFactory.getInstance().newTuple() ;
+        t3.append(3);
+        t3.append(8);
+        input.add(t3);
+
+        List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+        pr1.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan1 = new PhysicalPlan();
+        expPlan1.add(pr1);
+        sortPlans.add(expPlan1);
+
+        POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+        pr2.setResultType(DataType.INTEGER);
+        PhysicalPlan expPlan2 = new PhysicalPlan();
+        expPlan2.add(pr2);
+        sortPlans.add(expPlan2);
+
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        mAscCols.add(true);
+
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                                 sortPlans, mAscCols, null);
+
+        sort.setLimit(1);
+
+        Result res;
+        res = sort.getNextTuple();
+        assertEquals(((Tuple) res.result).get(0), 3);
+        assertEquals(((Tuple) res.result).get(1), 8);
+
+        res = sort.getNextTuple();
+        assertEquals(res.returnStatus, POStatus.STATUS_EOP);
+    }
+
+    @Test
     public void testPOSortUDF() throws ExecException {
         DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
                 MAX_TUPLES, 100);

Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Wed Apr 12 17:39:34 2017
@@ -542,5 +542,21 @@ public abstract class TestSecondarySort
 
         Util.deleteFile(cluster, clusterPath);
     }
-}
 
+    @Test
+    // Once NestedLimitedOptimizer is used, we cannot use secondary key optimizer
+    public void testNestedLimitedSort() throws Exception {
+        String query = "a = load 'studenttab10k' as (name:chararray, age:int, gpa:double);" +
+                       "b = group a by name;" +
+                       "c = foreach b {" +
+                       "c1 = order a by age;" +
+                       "c2 = limit c1 5;" +
+                       "generate c2;}" +
+                       "store c in 'empty';";
+
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+
+        assertEquals(0, so.getNumUseSecondaryKey());
+        assertEquals(0, so.getNumSortRemoved());
+    }
+}