You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/08 19:28:00 UTC

svn commit: r1566083 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/relational/LOForEach.java test/org/apache/pig/test/TestNewPlanFilterRule.java

Author: daijy
Date: Sat Feb  8 18:28:00 2014
New Revision: 1566083

URL: http://svn.apache.org/r1566083
Log:
PIG-3347: Store invocation brings side effect

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
    pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Feb  8 18:28:00 2014
@@ -213,6 +213,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3347: Store invocation brings side effect (daijy)
+
 PIG-3670: Fix assert in Pig script (daijy)
 
 PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java Sat Feb  8 18:28:00 2014
@@ -77,44 +77,25 @@ public class LOForEach extends LogicalRe
     }
     
     // Find the LOInnerLoad of the inner plan corresponding to the project, and 
-    // also find whether there is a LOForEach in inner plan along the way
+    // also find whether there is a relational operator in inner plan along the way
     public static Pair<List<LOInnerLoad>, Boolean> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws FrontendException {
         boolean needNewUid = false;
-        LogicalRelationalOperator referred = project.findReferent();
-        // If it is nested foreach, generate new uid
-        if (referred instanceof LOForEach)
-            needNewUid = true;
-        List<Operator> srcs = referred.getPlan().getSources();
         List<LOInnerLoad> innerLoads = new ArrayList<LOInnerLoad>();
-        for (Operator src:srcs) {
-            if (src instanceof LOInnerLoad) {
-            	if( src == referred ) {
-            		innerLoads.add( (LOInnerLoad)src );
-            		continue;
-            	}
-            	
-            	Deque<Operator> stack = new LinkedList<Operator>();
-                List<Operator> succs = referred.getPlan().getSuccessors( src );
-                if( succs != null ) {
-                	for( Operator succ : succs ) {
-                		stack.push( succ );
-                	}
-                }
-                
-                while( !stack.isEmpty() ) {
-                	Operator op = stack.pop();
-                    if( op == referred ) {
-                        innerLoads.add((LOInnerLoad)src);
-                        break;
-                    }
-                    else {
-                    	List<Operator> ops = referred.getPlan().getSuccessors( op );
-                    	if( ops != null ) {
-                        	for( Operator o : ops ) {
-                        		stack.push( o );
-                        	}
-                    	}
-                    }
+        LogicalRelationalOperator referred = project.findReferent();
+        Deque<Operator> stack = new LinkedList<Operator>();
+        stack.add(referred);
+        while( !stack.isEmpty() ) {
+            Operator op = stack.pop();
+            if (op instanceof LOInnerLoad) {
+                innerLoads.add((LOInnerLoad)op);
+            }
+            else if (!(op instanceof LOGenerate)) {
+                needNewUid = true;
+            }
+            List<Operator> ops = referred.getPlan().getPredecessors( op );
+            if( ops != null ) {
+                for( Operator o : ops ) {
+                    stack.push( o );
                 }
             }
         }

Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Sat Feb  8 18:28:00 2014
@@ -51,6 +51,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
@@ -570,6 +571,110 @@ public class TestNewPlanFilterRule {
         
     }
 
+    /**
+     * Test that filter cannot get pushed up over nested Distinct (see PIG-3347)
+     */
+    @Test
+    public void testFilterAfterNestedDistinct() throws Exception {
+        String query = "a = LOAD 'file.txt';" +
+            "a_group = group a by $0;" +
+            "b = foreach a_group { a_distinct = distinct a.$0;generate group, a_distinct;}" +
+            "c = filter b by SIZE(a_distinct) == 1;" +
+            "store c into 'empty';";
+
+        // filter should not be pushed above nested distinct,
+        //ie expect - loload -> locogroup -> foreach -> filter
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        newLogicalPlan.explain(System.out, "text", true);
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( cogroup instanceof LOCogroup );
+        Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+    }
+
+    /**
+     * Test that filter cannot get pushed up over nested Limit (see PIG-3347)
+     */
+    @Test
+    public void testFilterAfterNestedLimit() throws Exception {
+        String query = "a = LOAD 'file.txt';" +
+            "a_group = group a by $0;" +
+            "b = foreach a_group { a_limit = limit a.$0 5;generate group, a_limit;}" +
+            "c = filter b by SIZE(a_limit) == 1;" +
+            "store c into 'empty';";
+
+        // filter should not be pushed above nested distinct,
+        //ie expect - loload -> locogroup -> foreach -> filter
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        newLogicalPlan.explain(System.out, "text", true);
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( cogroup instanceof LOCogroup );
+        Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+    }
+
+    /**
+     * Test that filter cannot get pushed up over nested Filter (see PIG-3347)
+     */
+    @Test
+    public void testFilterAfterNestedFilter() throws Exception {
+        String query = "a = LOAD 'file.txt';" +
+            "a_group = group a by $0;" +
+            "b = foreach a_group { a_filter = filter a by $0 == 1;generate group, a_filter;}" +
+            "c = filter b by SIZE(a_filter) == 1;" +
+            "store c into 'empty';";
+
+        // filter should not be pushed above nested distinct,
+        //ie expect - loload -> locogroup -> foreach -> filter
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        newLogicalPlan.explain(System.out, "text", true);
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+        Assert.assertTrue( cogroup instanceof LOCogroup );
+        Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+        Assert.assertTrue( foreach instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+    }
+    
+    /**
+     * Test that filter does not get blocked for PushUpFilter/FilterAboveForeach
+     * by an unrelated nested filter (see PIG-3347)
+     */
+    @Test
+    public void testFilterAfterUnrelatedNestedFilter() throws Exception {
+        String query = "a = LOAD 'file.txt' as (a0:int, a1_bag:bag{(X:int)}, a2_bag:bag{(Y:int)});" +
+            "b = foreach a { a1_filter = filter a1_bag by X == 1; generate a0, a1_filter, a2_bag;}" +
+            "c = filter b by SIZE(a2_bag) == 1;" +
+            "store c into 'empty';";
+
+        // filter should be pushed above nested filter,
+        //ie expect - loload -> locogroup -> foreach -> filter
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+        newLogicalPlan.explain(System.out, "text", true);
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Operator foreach1 = newLogicalPlan.getSuccessors(load).get( 0 );
+        Assert.assertTrue( foreach1 instanceof LOForEach );
+        Operator filter = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
+        Assert.assertTrue( filter instanceof LOFilter );
+        Operator foreach2 = newLogicalPlan.getSuccessors(filter).get( 0 );
+        Assert.assertTrue( foreach2 instanceof LOForEach );
+    }
+
     private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
     	PigServer pigServer = new PigServer(pc);
         LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -602,6 +707,11 @@ public class TestNewPlanFilterRule {
             r = new PushUpFilter( "PushUpFilter" );
             s.add(r);            
             ls.add(s);
+
+            s = new HashSet<Rule>();
+            r = new FilterAboveForeach( "PushUpFilter" );
+            s.add(r);
+            ls.add(s);
             
             return ls;
         }