You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/14 00:21:43 UTC

svn commit: r1300406 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java test/org/apache/pig/test/TestOptimizeLimit.java

Author: daijy
Date: Tue Mar 13 23:21:42 2012
New Revision: 1300406

URL: http://svn.apache.org/viewvc?rev=1300406&view=rev
Log:
PIG-2570: LimitOptimizer fails with dynamic LIMIT argument

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 13 23:21:42 2012
@@ -269,6 +269,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2570: LimitOptimizer fails with dynamic LIMIT argument (daijy)
+
 PIG-2543: PigStats.isSuccessful returns false if embedded pig script has sh commands (daijy)
 
 PIG-2509: Util.getSchemaFromString fails with java.lang.NullPointerException when a tuple in a bag has no name (as when used in MongoStorage UDF) (jcoveney via daijy)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Tue Mar 13 23:21:42 2012
@@ -121,8 +121,25 @@ public class LimitOptimizer extends Rule
                 // Get operator before LOForEach
                 Operator prepredecessor = currentPlan.getPredecessors(pred)
                     .get(0);
+                
+                List<Operator> softPrepredecessors=null;
+                // get a clone of softPrepredecessors to avoid ConcurrentModificationException
+                if (currentPlan.getSoftLinkPredecessors(limit)!=null) {
+                    softPrepredecessors=new ArrayList<Operator>(
+                            currentPlan.getSoftLinkPredecessors(limit));
+                }
+                if (softPrepredecessors!=null) {
+                    for (Operator op : softPrepredecessors) {
+                        currentPlan.removeSoftLink(op, limit);
+                    }
+                }
                 currentPlan.removeAndReconnect(limit);
                 currentPlan.insertBetween(prepredecessor, limit, pred);
+                if (softPrepredecessors!=null) {
+                    for (Operator op : softPrepredecessors) {
+                        currentPlan.createSoftLink(op, limit);
+                    }
+                }
             } else if (limit.getLimitPlan() == null) {
                 // TODO selectively enable optimizations for variable limit
                 if (pred instanceof LOCross || pred instanceof LOUnion) {

Modified: pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1300406&r1=1300405&r2=1300406&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Tue Mar 13 23:21:42 2012
@@ -25,6 +25,9 @@ import java.util.*;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.LimitOptimizer;
@@ -201,6 +204,24 @@ public class TestOptimizeLimit {
     	LogicalPlan plan = Util.buildLp(pigServer, query);
 	    optimizePlan(plan);
     }
+    
+    @Test
+    // See PIG-2570
+    public void testLimitSoftLink() throws Exception {
+        String query = "A = LOAD 'data1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);"
+            + "B = group A all; "
+            + "C = foreach B generate SUM(A.age) as total; "
+            + "D = foreach A generate owner, age/(double)C.total AS percentAge; "
+            + "F = LIMIT D C.total/8;"
+            + "store F into 'output';";
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+        optimizePlan(newLogicalPlan);
+        LOStore store = (LOStore)newLogicalPlan.getSinks().get(0);
+        LOForEach foreach1 = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+        LOForEach foreach2 = (LOForEach)newLogicalPlan.getPredecessors(foreach1).get(0);
+        LOLimit limit = (LOLimit)newLogicalPlan.getPredecessors(foreach2).get(0);
+        Assert.assertTrue(newLogicalPlan.getSoftLinkPredecessors(limit).get(0) instanceof LOStore);
+    }
 
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {