You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2017/04/12 17:39:35 UTC
svn commit: r1791166 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/util/
src/org/apache/pig/data/ src/org/apache/pig/newplan/logical/optimizer/...
Author: daijy
Date: Wed Apr 12 17:39:34 2017
New Revision: 1791166
URL: http://svn.apache.org/viewvc?rev=1791166&view=rev
Log:
PIG-5211: Optimize Nested Limited Sort
Added:
pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
pig/trunk/src/org/apache/pig/data/BagFactory.java
pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java
pig/trunk/test/org/apache/pig/test/TestDataBag.java
pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
pig/trunk/test/org/apache/pig/test/TestPOSort.java
pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 12 17:39:34 2017
@@ -36,6 +36,8 @@ PIG-5067: Revisit union on numeric type
IMPROVEMENTS
+PIG-5211: Optimize Nested Limited Sort (jins via daijy)
+
PIG-5214: search any substring in the input string (rainer-46 via daijy)
PIG-5210: Option to print MR/Tez plan before launching (ly16 via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Wed Apr 12 17:39:34 2017
@@ -267,10 +267,15 @@ public class POSort extends PhysicalOper
}
}
}
- // by default, we create InternalSortedBag, unless user configures
- // explicitly to use old bag
- sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
- : new InternalSortedBag(3, mComparator);
+
+ if (isLimited()) {
+ sortedBag = mBagFactory.newLimitedSortedBag(mComparator, limit);
+ } else {
+ // by default, we create InternalSortedBag, unless user configures
+ // explicitly to use old bag
+ sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
+ : new InternalSortedBag(3, mComparator);
+ }
while (inp.returnStatus != POStatus.STATUS_EOP) {
if (inp.returnStatus == POStatus.STATUS_ERR) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Wed Apr 12 17:39:34 2017
@@ -569,6 +569,12 @@ public class SecondaryKeyOptimizerUtil {
// We see POSort, check which key it is using
public boolean processSort(POSort sort) throws FrontendException{
+
+ // if sort has a limit, it is already optimized by NestedLimitedSort optimizer
+ if (sort.isLimited()) {
+ return true;
+ }
+
SortKeyInfo keyInfo = new SortKeyInfo();
for (int i = 0; i < sort.getSortPlans().size(); i++) {
PhysicalPlan sortPlan = sort.getSortPlans().get(i);
Modified: pig/trunk/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BagFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/BagFactory.java Wed Apr 12 17:39:34 2017
@@ -98,7 +98,17 @@ public abstract class BagFactory {
* @return default data bag.
*/
public abstract DataBag newDefaultBag(List<Tuple> listOfTuples);
-
+
+ /**
+ * Get a limited sorted data bag. Limited sorted bags are sorted bags
+ * with number of elements no more than limit.
+ * @param comp Comparator that controls how the data is sorted.
+ * If null, default comparator will be used.
+ * @param limit max number of tuples in bag
+ * @return a sorted data bag
+ */
+ public abstract DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit);
+
/**
* Get a sorted data bag. Sorted bags guarantee that when an iterator
* is opened on the bag the tuples will be returned in sorted order.
Modified: pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultBagFactory.java Wed Apr 12 17:39:34 2017
@@ -46,6 +46,17 @@ public class DefaultBagFactory extends B
}
/**
+ * Get a limited sorted data bag.
+ * @param comp Comparator that controls how the data is sorted.
+ * If null, default comparator will be used.
+ */
+ @Override
+ public DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit) {
+ DataBag b = new LimitedSortedDataBag(comp, limit);
+ return b;
+ }
+
+ /**
* Get a sorted data bag.
* @param comp Comparator that controls how the data is sorted.
* If null, default comparator will be used.
Added: pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java (added)
+++ pig/trunk/src/org/apache/pig/data/LimitedSortedDataBag.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+
+/**
+ * An ordered collection of Tuples (possibly) with multiples. Data is
+ * stored in a priority queue as it comes in, and only sorted when iterator is requested.
+ *
+ * LimitedSortedDataBag is not spillable.
+ *
+ * We allow a user defined comparator, but provide a default comparator in
+ * cases where the user doesn't specify one.
+ */
+public class LimitedSortedDataBag implements DataBag {
+
+ private static final Log log = LogFactory.getLog(LimitedSortedDataBag.class);
+ private static final long serialVersionUID = 1L;
+
+ private final Comparator<Tuple> mComp;
+ private final PriorityQueue<Tuple> priorityQ;
+ private final long limit;
+
+ /**
+ * @param comp Comparator to use to do the sorting.
+ * If null, DefaultComparator will be used.
+ */
+ public LimitedSortedDataBag(Comparator<Tuple> comp, long limit) {
+ this.mComp = comp == null ? new DefaultComparator() : comp;
+ this.limit = limit;
+ this.priorityQ = new PriorityQueue<Tuple>(
+ (int)limit, getReversedComparator(mComp));
+ }
+
+ /**
+ * Get the number of elements in the bag in memory.
+ * @return number of elements in the bag
+ */
+ @Override
+ public long size() {
+ return priorityQ.size();
+ }
+
+ /**
+ * Find out if the bag is sorted.
+ * @return true if this is a sorted data bag, false otherwise.
+ */
+ @Override
+ public boolean isSorted() {
+ return true;
+ }
+
+ /**
+ * Find out if the bag is distinct.
+ * @return true if the bag is a distinct bag, false otherwise.
+ */
+ @Override
+ public boolean isDistinct() {
+ return false;
+ }
+
+ /**
+ * Get an iterator to the bag. For default and distinct bags,
+ * no particular order is guaranteed. For sorted bags the order
+ * is guaranteed to be sorted according
+ * to the provided comparator.
+ * @return tuple iterator
+ */
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new LimitedSortedDataBagIterator();
+ }
+
+ /**
+ * Add a tuple to the bag.
+ * @param t tuple to add.
+ */
+ @Override
+ public void add(Tuple t) {
+ priorityQ.add(t);
+ if (priorityQ.size() > limit) {
+ priorityQ.poll();
+ }
+ }
+
+ /**
+ * Add contents of a bag to the bag.
+ * @param b bag to add contents of.
+ */
+ @Override
+ public void addAll(DataBag b) {
+ Iterator<Tuple> it = b.iterator();
+ while(it.hasNext()) {
+ add(it.next());
+ }
+ }
+
+ /**
+ * Clear out the contents of the bag, both on disk and in memory.
+ * Any attempts to read after this is called will produce undefined
+ * results.
+ */
+ @Override
+ public void clear() {
+ priorityQ.clear();
+ }
+
+ /**
+ * Write a bag's contents to disk.
+ * @param out DataOutput to write data to.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ public void write(DataOutput out) throws IOException {
+ // We don't care whether this bag was sorted or distinct because
+ // using the iterator to write it will guarantee those things come
+ // correctly. And on the other end there'll be no reason to waste
+ // time re-sorting or re-applying distinct.
+ out.writeLong(size());
+ Iterator<Tuple> it = iterator();
+ while (it.hasNext()) {
+ Tuple item = it.next();
+ item.write(out);
+ }
+ }
+
+ /**
+ * Read a bag from disk.
+ * @param in DataInput to read data from.
+ * @throws IOException (passes it on from underlying calls).
+ */
+ public void readFields(DataInput in) throws IOException {
+ long size = in.readLong();
+
+ for (long i = 0; i < size; i++) {
+ try {
+ Object o = DataReaderWriter.readDatum(in);
+ add((Tuple)o);
+ } catch (ExecException ee) {
+ throw ee;
+ }
+ }
+ }
+
+ /**
+ * Write the bag into a string.
+ */
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append('{');
+ Iterator<Tuple> it = iterator();
+ while ( it.hasNext() ) {
+ Tuple t = it.next();
+ String s = t.toString();
+ sb.append(s);
+ if (it.hasNext()) sb.append(",");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(Object other) {
+ if (this == other)
+ return 0;
+ if (other instanceof DataBag) {
+ DataBag bOther = (DataBag) other;
+ if (this.size() != bOther.size()) {
+ if (this.size() > bOther.size()) return 1;
+ else return -1;
+ }
+
+ // if we got this far, both bags should have same size
+ // make a LimitedSortedBag for the other bag with same comparator and limit
+ // so that both bag are sorted and we can loop through both iterators
+ DataBag otherCloneDataBag = new LimitedSortedDataBag(mComp, limit);
+ otherCloneDataBag.addAll((DataBag) other);
+
+ Iterator<Tuple> thisIt = this.iterator();
+ Iterator<Tuple> otherIt = otherCloneDataBag.iterator();
+
+ while (thisIt.hasNext() && otherIt.hasNext()) {
+ Tuple thisT = thisIt.next();
+ Tuple otherT = otherIt.next();
+
+ int c = thisT.compareTo(otherT);
+ if (c != 0) return c;
+ }
+ return 0; // if we got this far, they must be equal
+ } else {
+ return DataType.compare(this, other);
+ }
+ }
+
+ /**
+ * Not implemented.
+ * This is used by FuncEvalSpec.FakeDataBag.
+ * @param stale Set stale state.
+ */
+ @Override
+ public void markStale(boolean stale) {
+ throw new RuntimeException("LimitedSortedDataBag cannot be marked stale");
+ }
+
+ /**
+ * Not implemented.
+ */
+ @Override
+ public long spill() {
+ return 0;
+ }
+
+ /**
+ * Not implemented.
+ */
+ @Override
+ public long getMemorySize() {
+ return 0;
+ }
+
+ private static class DefaultComparator implements Comparator<Tuple> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public int compare(Tuple t1, Tuple t2) {
+ return t1.compareTo(t2);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 42;
+ }
+ }
+
+ /**
+ * Since comparator in Java 1.7 does not have reversed(),
+ * we need this method to get a reversed comparator for a given comparator
+ * @param comp Comparator to reverse
+ * @return reversed comparator
+ */
+ private <T> Comparator<T> getReversedComparator(final Comparator<T> comp) {
+
+ return new Comparator<T>() {
+
+ @Override
+ public int compare(T o1, T o2) {
+ return -comp.compare(o1, o2);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return comp.equals(o);
+ }
+ };
+ }
+
+ /**
+ * An iterator that handles getting the next tuple from the bag.
+ * Since priority queue iterator does not return elements in any order
+ * we need to dump elements in a List, sort them, and return iterator of the List
+ */
+ private class LimitedSortedDataBagIterator implements Iterator<Tuple> {
+
+ private int mCntr;
+ private final List<Tuple> mContents;
+
+ public LimitedSortedDataBagIterator() {
+ mCntr = 0;
+ mContents = new ArrayList<>(priorityQ);
+ Collections.sort(mContents, mComp);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (mCntr < mContents.size());
+ }
+
+ @Override
+ public Tuple next() {
+ // This will report progress every 1024 times through next.
+ // This should be much faster than using mod.
+ if ((mCntr & 0x3ff) == 0) reportProgress();
+
+ return mContents.get(mCntr++);
+ }
+
+ /**
+ * Not implemented.
+ */
+ public void remove() {
+ throw new RuntimeException("Cannot remove() from LimitedSortedDataBag.iterator()");
+ }
+ }
+
+ /**
+ * Report progress to HDFS.
+ */
+ protected void reportProgress() {
+ if (PhysicalOperator.getReporter() != null) {
+ PhysicalOperator.getReporter().progress();
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Apr 12 17:39:34 2017
@@ -40,6 +40,7 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeFilter;
import org.apache.pig.newplan.logical.rules.MergeForEach;
+import org.apache.pig.newplan.logical.rules.NestedLimitOptimizer;
import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer;
import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
@@ -202,6 +203,15 @@ public class LogicalPlanOptimizer extend
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
+
+ // Nested Limit Set
+ // This set of rules push up nested limit
+ s = new HashSet<Rule>();
+ // Optimize limit
+ r = new NestedLimitOptimizer("NestedLimitOptimizer");
+ checkAndAddRule(s, r);
+ if (!s.isEmpty())
+ ls.add(s);
return ls;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Apr 12 17:39:34 2017
@@ -259,6 +259,7 @@ public class LogicalPlan extends BaseOpe
disabledOptimizerRules.add("MergeForEach");
disabledOptimizerRules.add("PartitionFilterOptimizer");
disabledOptimizerRules.add("LimitOptimizer");
+ disabledOptimizerRules.add("NestedLimitOptimizer");
disabledOptimizerRules.add("SplitFilter");
disabledOptimizerRules.add("PushUpFilter");
disabledOptimizerRules.add("MergeFilter");
Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class NestedLimitOptimizer extends Rule {
+
+ public NestedLimitOptimizer(String name) {
+ super(name, false);
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator forEach = new LOForEach(plan);
+ plan.add(forEach);
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new OptimizeNestedLimitTransformer();
+ }
+
+ public class OptimizeNestedLimitTransformer extends Transformer {
+
+ @Override
+ public boolean check(OperatorPlan matched) {
+
+ LOForEach forEach = (LOForEach) matched.getSources().get(0);
+ LogicalPlan innerPlan = forEach.getInnerPlan();
+
+ // check if there is a LOSort immediately followed by LOLimit in innerPlan
+ Iterator<Operator> it = innerPlan.getOperators();
+
+ while(it.hasNext()) {
+ Operator op = it.next();
+ if (op instanceof LOLimit) {
+ List<Operator> preds = innerPlan.getPredecessors(op);
+ // Limit should always have exactly 1 predecessor
+ if (preds.get(0) instanceof LOSort) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return currentPlan;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+
+ LOForEach forEach = (LOForEach) matched.getSources().get(0);
+ LogicalPlan innerPlan = forEach.getInnerPlan();
+
+ // Get LOSort and LOLimit from innerPlan
+ Iterator<Operator> it = innerPlan.getOperators();
+
+ LOLimit limit = null;
+ LOSort sort = null;
+ while(it.hasNext()) {
+ Operator op = it.next();
+ if (op instanceof LOLimit) {
+ List<Operator> preds = innerPlan.getPredecessors(op);
+ // Limit should always have exactly 1 predecessor
+ if (preds.get(0) instanceof LOSort) {
+ limit = (LOLimit) op;
+ sort = (LOSort) (preds.get(0));
+ break;
+ }
+ }
+ }
+
+ // set limit in LOSort, and remove LOLimit
+ sort.setLimit(limit.getLimit());
+ innerPlan.removeAndReconnect(limit);
+ }
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java (original)
+++ pig/trunk/test/org/apache/pig/test/NonDefaultBagFactory.java Wed Apr 12 17:39:34 2017
@@ -33,6 +33,7 @@ public class NonDefaultBagFactory extend
public DataBag newDefaultBag(List<Tuple> listOfTuples) {
return null;
}
+ public DataBag newLimitedSortedBag(Comparator<Tuple> comp, long limit) { return null; }
public DataBag newSortedBag(Comparator<Tuple> comp) { return null; }
public DataBag newDistinctBag() { return null; }
Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Wed Apr 12 17:39:34 2017
@@ -34,6 +34,7 @@ import java.util.PriorityQueue;
import java.util.Random;
import java.util.TreeSet;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultDataBag;
@@ -42,6 +43,7 @@ import org.apache.pig.data.DistinctDataB
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.InternalDistinctBag;
import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.LimitedSortedDataBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.SortedDataBag;
@@ -741,10 +743,13 @@ public class TestDataBag {
DataBag bag = f.newDefaultBag();
DataBag sorted = f.newSortedBag(null);
+ DataBag limitedSorted = f.newLimitedSortedBag(null, 1);
DataBag distinct = f.newDistinctBag();
assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
+ assertTrue("Expected a limited sorted bag",
+ (limitedSorted instanceof LimitedSortedDataBag));
assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
}
@@ -1231,7 +1236,42 @@ public class TestDataBag {
assertFalse(iter.hasNext());
assertFalse("hasNext should be idempotent", iter.hasNext());
}
-}
-
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testLimitedSortedBag() throws ExecException {
+ DataBag bag = new LimitedSortedDataBag(null, 2);
+ Tuple t;
+ t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, 2);
+ bag.add(t);
+ t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, 0);
+ bag.add(t);
+ t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, 1);
+ bag.add(t);
+ // test size()
+ assertEquals(bag.size(), 2);
+ // test isSorted()
+ assertTrue(bag.isSorted());
+ // test isDistinct()
+ assertFalse(bag.isDistinct());
+ // test iterator()
+ Iterator<Tuple> it = bag.iterator();
+ assertEquals(it.next().get(0), 0);
+ assertEquals(it.next().get(0), 1);
+ assertEquals(it.hasNext(), false);
+ // test addAll()
+ DataBag bag1 = new LimitedSortedDataBag(null, 1);
+ bag1.addAll(bag);
+ assertEquals(bag1.size(), 1);
+ // test compareTo()
+ assertEquals(bag.compareTo(bag), 0);
+ assertEquals(bag.compareTo(bag1), 1);
+ // test clear()
+ bag1.clear();
+ assertEquals(bag1.size(), 0);
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Apr 12 17:39:34 2017
@@ -35,10 +35,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.StringTokenizer;
-import junit.framework.Assert;
-
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
@@ -73,8 +72,10 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import junit.framework.Assert;
+
public class TestEvalPipelineLocal {
-
+
private PigServer pigServer;
static final int MAX_SIZE = 100000;
@@ -1289,4 +1290,38 @@ public class TestEvalPipelineLocal {
Assert.assertTrue(bos.toString().contains("New For Each(false,false)[tuple]"));
}
+
+ @Test
+ public void testNestedLimitedSort() throws Exception {
+
+ File f1 = createFile(new String[]{
+ "katie carson\t25\t3.65",
+ "katie carson\t65\t0.73",
+ "katie carson\t57\t2.43",
+ "katie carson\t55\t3.77",
+ "holly white\t43\t0.24"});
+
+ String query = "a = load '" + Util.generateURI(f1.toString(), pigServer.getPigContext()) +
+ "' as (name:chararray,age:int, gpa:double);" +
+ "b = group a by name;" +
+ "c = foreach b {" +
+ "c1 = a.(age, gpa);" +
+ "c2 = order c1 by age;" +
+ "c3 = limit c2 3;" +
+ "generate c3;}";
+
+ pigServer.registerQuery(query);
+ Iterator<Tuple> iter = pigServer.openIterator("c");
+
+ Set<String> expectedResultSet = new HashSet<>();
+ expectedResultSet.add("({(25,3.65),(55,3.77),(57,2.43)})");
+ expectedResultSet.add("({(43,0.24)})");
+
+ Set<String> resultSet = new HashSet<>();
+ resultSet.add(iter.next().toString());
+ resultSet.add(iter.next().toString());
+
+ assertTrue(resultSet.equals(expectedResultSet));
+ assertFalse(iter.hasNext());
+ }
}
Added: pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java?rev=1791166&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeNestedLimit.java Wed Apr 12 17:39:34 2017
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.NestedLimitOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestOptimizeNestedLimit {
+
+ LogicalPlan lp;
+ PhysicalPlan php;
+ MROperPlan mrp;
+
+ @Before
+ public void setup() throws Exception {
+
+ PigContext pc = new PigContext( ExecType.LOCAL, new Properties());
+ PigServer pigServer = new PigServer(pc);
+ String query = "A = load 'myfile';" +
+ "B = group A by $0;" +
+ "C = foreach B {" +
+ "C1 = order A by $1;" +
+ "C2 = limit C1 5;" +
+ "generate C2;}" +
+ "store C into 'empty';";
+ lp = optimizePlan(Util.buildLp(pigServer, query));
+ php = Util.buildPp(pigServer, query);
+ mrp = Util.buildMRPlanWithOptimizer(php, pc);
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+ @Test
+ // Test logical plan
+ public void testLogicalPlan() throws Exception {
+
+ Iterator<Operator> it = lp.getOperators();
+
+ LOForEach forEach = null;
+ while(it.hasNext()) {
+ Operator op = it.next();
+ if (op instanceof LOForEach) {
+ assertNull("There should be only one LOForEach", forEach);
+ forEach = (LOForEach) op;
+ }
+ }
+ assertNotNull("ForEach is missing", forEach);
+
+ it = forEach.getInnerPlan().getOperators();
+ LOSort sort = null;
+ while(it.hasNext()) {
+ Operator op = it.next();
+ if (op instanceof LOLimit) {
+ fail("There should be no LOLimit");
+ } else if (op instanceof LOSort) {
+ assertNull("There should be only one LOSort", sort);
+ sort = (LOSort) op;
+ }
+ }
+ assertNotNull("LOSort is missing", sort);
+ assertEquals(sort.getLimit(), 5);
+ }
+
+ @Test
+ // test physical plan
+ public void testPhysicalPlan() throws Exception {
+
+ Iterator<PhysicalOperator> it = php.iterator();
+
+ POForEach forEach = null;
+ while (it.hasNext()) {
+ PhysicalOperator op = it.next();
+ if (op instanceof POForEach) {
+ assertNull("There should be only 1 POForEach", forEach);
+ forEach = (POForEach) op;
+ }
+ }
+ assertNotNull("POForEach is missing", forEach);
+
+ List<PhysicalPlan> inps = forEach.getInputPlans();
+ assertEquals(inps.size(), 1);
+
+ it = inps.get(0).iterator();
+ POSort sort = null;
+ while(it.hasNext()) {
+ PhysicalOperator op = it.next();
+ if (op instanceof POLimit) {
+ fail("There should be no POLimit");
+ } else if (op instanceof POSort) {
+ assertNull("There should be only 1 POSort", sort);
+ sort = (POSort) op;
+ }
+ }
+ assertNotNull("POSort is missing", sort);
+ assertEquals(sort.getLimit(), 5);
+ }
+
+ @Test
+ // test MR plan
+ public void testMRPlan() throws Exception {
+
+ List<MapReduceOper> ops = mrp.getLeaves();
+ assertEquals(ops.size(), 1);
+
+ PhysicalPlan plan = ops.get(0).reducePlan;
+ Iterator<PhysicalOperator> it = plan.iterator();
+
+ POForEach forEach = null;
+ while(it.hasNext()) {
+ PhysicalOperator op = it.next();
+ if (op instanceof POForEach) {
+ assertNull("There should be only 1 POForEach", forEach);
+ forEach = (POForEach) op;
+ }
+ }
+ assertNotNull("POForEach is missing", forEach);
+
+ List<PhysicalPlan> inps = forEach.getInputPlans();
+ assertEquals(inps.size(), 1);
+
+ it = inps.get(0).iterator();
+ POSort sort = null;
+ while(it.hasNext()) {
+ PhysicalOperator op = it.next();
+ if (op instanceof POLimit) {
+ fail("There should be no POLimit");
+ } else if (op instanceof POSort) {
+ assertNull("There should be only 1 POSort", sort);
+ sort = (POSort) op;
+ }
+ }
+ assertNotNull("POSort is missing", sort);
+ assertEquals(sort.getLimit(), 5);
+ }
+
+ public class MyPlanOptimizer extends LogicalPlanOptimizer {
+ protected MyPlanOptimizer(OperatorPlan p, int iterations) {
+ super( p, iterations, new HashSet<String>() );
+ }
+
+ @Override
+ protected List<Set<Rule>> buildRuleSets() {
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ Set<Rule> s = null;
+ Rule r = null;
+
+ s = new HashSet<Rule>();
+ r = new NestedLimitOptimizer("OptimizeNestedLimit");
+ s.add(r);
+ ls.add(s);
+
+ return ls;
+ }
+ }
+
+ private LogicalPlan optimizePlan(LogicalPlan plan) throws IOException {
+ PlanOptimizer optimizer = new MyPlanOptimizer( plan, 3 );
+ optimizer.optimize();
+ return plan;
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOSort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOSort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOSort.java Wed Apr 12 17:39:34 2017
@@ -593,6 +593,62 @@ public class TestPOSort {
}
@Test
+ public void testPOSortWithLimit() throws ExecException {
+
+ DataBag input = DefaultBagFactory.getInstance().newDefaultBag();
+
+ Tuple t1 = TupleFactory.getInstance().newTuple() ;
+ t1.append(1);
+ t1.append(8);
+ input.add(t1);
+
+ Tuple t2 = TupleFactory.getInstance().newTuple() ;
+ t2.append(2);
+ t2.append(8);
+ input.add(t2);
+
+ Tuple t3 = TupleFactory.getInstance().newTuple() ;
+ t3.append(3);
+ t3.append(8);
+ input.add(t3);
+
+ List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
+
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan1 = new PhysicalPlan();
+ expPlan1.add(pr1);
+ sortPlans.add(expPlan1);
+
+ POProject pr2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr2.setResultType(DataType.INTEGER);
+ PhysicalPlan expPlan2 = new PhysicalPlan();
+ expPlan2.add(pr2);
+ sortPlans.add(expPlan2);
+
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ mAscCols.add(true);
+
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+
+ sort.setLimit(1);
+
+ Result res;
+ res = sort.getNextTuple();
+ assertEquals(((Tuple) res.result).get(0), 3);
+ assertEquals(((Tuple) res.result).get(1), 8);
+
+ res = sort.getNextTuple();
+ assertEquals(res.returnStatus, POStatus.STATUS_EOP);
+ }
+
+ @Test
public void testPOSortUDF() throws ExecException {
DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
MAX_TUPLES, 100);
Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1791166&r1=1791165&r2=1791166&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Wed Apr 12 17:39:34 2017
@@ -542,5 +542,21 @@ public abstract class TestSecondarySort
Util.deleteFile(cluster, clusterPath);
}
-}
+ @Test
+ // Once NestedLimitedOptimizer is used, we cannot use secondary key optimizer
+ public void testNestedLimitedSort() throws Exception {
+ String query = "a = load 'studenttab10k' as (name:chararray, age:int, gpa:double);" +
+ "b = group a by name;" +
+ "c = foreach b {" +
+ "c1 = order a by age;" +
+ "c2 = limit c1 5;" +
+ "generate c2;}" +
+ "store c in 'empty';";
+
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+
+ assertEquals(0, so.getNumUseSecondaryKey());
+ assertEquals(0, so.getNumSortRemoved());
+ }
+}