You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/02/18 00:16:58 UTC
svn commit: r911219 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
test/org/apache/pig/test/TestMultiQuery.java
Author: rding
Date: Wed Feb 17 23:16:58 2010
New Revision: 911219
URL: http://svn.apache.org/viewvc?rev=911219&view=rev
Log:
PIG-1169: Top-N queries produce incorrect results when a store statement is added between order by and limit statements
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 17 23:16:58 2010
@@ -95,6 +95,8 @@
BUG FIXES
+PIG-1169: Top-N queries produce incorrect results when a store statement is added between order by and limit statement (rding)
+
PIG-1131: Pig simple join does not work when it contains empty lines (ashutoshc)
PIG-834: incorrect plan when algebraic functions are nested (ashutoshc)
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Wed Feb 17 23:16:58 2010
@@ -40,7 +40,11 @@
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanCloner;
import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
/**
@@ -138,7 +142,7 @@
// Limit cannot be pushed up
if (predecessor instanceof LOCogroup || predecessor instanceof LOFilter ||
predecessor instanceof LOLoad || predecessor instanceof LOSplit ||
- predecessor instanceof LOSplitOutput || predecessor instanceof LODistinct || predecessor instanceof LOJoin)
+ predecessor instanceof LODistinct || predecessor instanceof LOJoin)
{
return;
}
@@ -234,6 +238,48 @@
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}
+ // Limit and OrderBy (LOSort) can be separated by split
+ else if (predecessor instanceof LOSplitOutput) {
+ if(mode == ExecType.LOCAL) {
+ //We don't need this optimisation to happen in the local mode.
+ //so we do nothing here.
+ } else {
+ List<LogicalOperator> grandparants = mPlan
+ .getPredecessors(predecessor);
+ // After insertion of splitters, any node in the plan can
+ // have at most one predecessor
+ if (grandparants != null && grandparants.size() != 0
+ && grandparants.get(0) instanceof LOSplit) {
+ List<LogicalOperator> greatGrandparants = mPlan
+ .getPredecessors(grandparants.get(0));
+ if (greatGrandparants != null
+ && greatGrandparants.size() != 0
+ && greatGrandparants.get(0) instanceof LOSort) {
+ LOSort sort = (LOSort)greatGrandparants.get(0);
+ LOSort newSort = new LOSort(
+ sort.getPlan(),
+ new OperatorKey(
+ sort.getOperatorKey().scope,
+ NodeIdGenerator
+ .getGenerator()
+ .getNextNodeId(
+ sort.getOperatorKey().scope)),
+ sort.getSortColPlans(),
+ sort.getAscendingCols(),
+ sort.getUserFunc());
+
+ newSort.setLimit(limit.getLimit());
+ try {
+ mPlan.replace(limit, newSort);
+ } catch (PlanException e) {
+ int errCode = 2012;
+ String msg = "Can not replace LOLimit with LOSort after splitter";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+ }
+ }
else {
int errCode = 2013;
String msg = "Moving LOLimit in front of " + predecessor.getClass().getSimpleName() + " is not implemented";
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=911219&r1=911218&r2=911219&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed Feb 17 23:16:58 2010
@@ -88,6 +88,66 @@
myPig = null;
}
+ public void testMultiQueryJiraPig1169() {
+
+ // test case: Problems with some top N queries
+
+ String INPUT_FILE = "abc";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\t2\t3");
+ w.println("2\t3\t4");
+ w.println("3\t4\t5");
+ w.println("5\t6\t7");
+ w.println("6\t7\t8");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:int, b, c);");
+ myPig.registerQuery("A1 = Order A by a desc parallel 3;");
+ myPig.registerQuery("A2 = limit A1 2;");
+ myPig.registerQuery("store A1 into '/tmp/input1';");
+ myPig.registerQuery("store A2 into '/tmp/input2';");
+
+ myPig.executeBatch();
+
+ myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);");
+
+ Iterator<Tuple> iter = myPig.openIterator("B");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(6,7,8)",
+ "(5,6,7)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
public void testMultiQueryJiraPig1171() {
// test case: Problems with some top N queries