You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/14 00:21:43 UTC
svn commit: r1300406 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
test/org/apache/pig/test/TestOptimizeLimit.java
Author: daijy
Date: Tue Mar 13 23:21:42 2012
New Revision: 1300406
URL: http://svn.apache.org/viewvc?rev=1300406&view=rev
Log:
PIG-2570: LimitOptimizer fails with dynamic LIMIT argument
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 13 23:21:42 2012
@@ -269,6 +269,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2570: LimitOptimizer fails with dynamic LIMIT argument (daijy)
+
PIG-2543: PigStats.isSuccessful returns false if embedded pig script has sh commands (daijy)
PIG-2509: Util.getSchemaFromString fails with java.lang.NullPointerException when a tuple in a bag has no name (as when used in MongoStorage UDF) (jcoveney via daijy)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Tue Mar 13 23:21:42 2012
@@ -121,8 +121,25 @@ public class LimitOptimizer extends Rule
// Get operator before LOForEach
Operator prepredecessor = currentPlan.getPredecessors(pred)
.get(0);
+
+ List<Operator> softPrepredecessors=null;
+ // get a clone of softPrepredecessors to avoid ConcurrentModificationException
+ if (currentPlan.getSoftLinkPredecessors(limit)!=null) {
+ softPrepredecessors=new ArrayList<Operator>(
+ currentPlan.getSoftLinkPredecessors(limit));
+ }
+ if (softPrepredecessors!=null) {
+ for (Operator op : softPrepredecessors) {
+ currentPlan.removeSoftLink(op, limit);
+ }
+ }
currentPlan.removeAndReconnect(limit);
currentPlan.insertBetween(prepredecessor, limit, pred);
+ if (softPrepredecessors!=null) {
+ for (Operator op : softPrepredecessors) {
+ currentPlan.createSoftLink(op, limit);
+ }
+ }
} else if (limit.getLimitPlan() == null) {
// TODO selectively enable optimizations for variable limit
if (pred instanceof LOCross || pred instanceof LOUnion) {
Modified: pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Tue Mar 13 23:21:42 2012
@@ -25,6 +25,9 @@ import java.util.*;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.LimitOptimizer;
@@ -201,6 +204,24 @@ public class TestOptimizeLimit {
LogicalPlan plan = Util.buildLp(pigServer, query);
optimizePlan(plan);
}
+
+ @Test
+ // See PIG-2570
+ public void testLimitSoftLink() throws Exception {
+ String query = "A = LOAD 'data1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);"
+ + "B = group A all; "
+ + "C = foreach B generate SUM(A.age) as total; "
+ + "D = foreach A generate owner, age/(double)C.total AS percentAge; "
+ + "F = LIMIT D C.total/8;"
+ + "store F into 'output';";
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+ optimizePlan(newLogicalPlan);
+ LOStore store = (LOStore)newLogicalPlan.getSinks().get(0);
+ LOForEach foreach1 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+ LOForEach foreach2 = (LOForEach)newLogicalPlan.getPredecessors(foreach1).get(0);
+ LOLimit limit = (LOLimit)newLogicalPlan.getPredecessors(foreach2).get(0);
+ Assert.assertTrue(newLogicalPlan.getSoftLinkPredecessors(limit).get(0) instanceof LOStore);
+ }
public class MyPlanOptimizer extends LogicalPlanOptimizer {