You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/02/08 19:28:00 UTC
svn commit: r1566083 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/relational/LOForEach.java
test/org/apache/pig/test/TestNewPlanFilterRule.java
Author: daijy
Date: Sat Feb 8 18:28:00 2014
New Revision: 1566083
URL: http://svn.apache.org/r1566083
Log:
PIG-3347: Store invocation brings side effect
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Feb 8 18:28:00 2014
@@ -213,6 +213,8 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3347: Store invocation brings side effect (daijy)
+
PIG-3670: Fix assert in Pig script (daijy)
PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage (aniket486)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java Sat Feb 8 18:28:00 2014
@@ -77,44 +77,25 @@ public class LOForEach extends LogicalRe
}
// Find the LOInnerLoad of the inner plan corresponding to the project, and
- // also find whether there is a LOForEach in inner plan along the way
+ // also find whether there is a relational operator in inner plan along the way
public static Pair<List<LOInnerLoad>, Boolean> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws FrontendException {
boolean needNewUid = false;
- LogicalRelationalOperator referred = project.findReferent();
- // If it is nested foreach, generate new uid
- if (referred instanceof LOForEach)
- needNewUid = true;
- List<Operator> srcs = referred.getPlan().getSources();
List<LOInnerLoad> innerLoads = new ArrayList<LOInnerLoad>();
- for (Operator src:srcs) {
- if (src instanceof LOInnerLoad) {
- if( src == referred ) {
- innerLoads.add( (LOInnerLoad)src );
- continue;
- }
-
- Deque<Operator> stack = new LinkedList<Operator>();
- List<Operator> succs = referred.getPlan().getSuccessors( src );
- if( succs != null ) {
- for( Operator succ : succs ) {
- stack.push( succ );
- }
- }
-
- while( !stack.isEmpty() ) {
- Operator op = stack.pop();
- if( op == referred ) {
- innerLoads.add((LOInnerLoad)src);
- break;
- }
- else {
- List<Operator> ops = referred.getPlan().getSuccessors( op );
- if( ops != null ) {
- for( Operator o : ops ) {
- stack.push( o );
- }
- }
- }
+ LogicalRelationalOperator referred = project.findReferent();
+ Deque<Operator> stack = new LinkedList<Operator>();
+ stack.add(referred);
+ while( !stack.isEmpty() ) {
+ Operator op = stack.pop();
+ if (op instanceof LOInnerLoad) {
+ innerLoads.add((LOInnerLoad)op);
+ }
+ else if (!(op instanceof LOGenerate)) {
+ needNewUid = true;
+ }
+ List<Operator> ops = referred.getPlan().getPredecessors( op );
+ if( ops != null ) {
+ for( Operator o : ops ) {
+ stack.push( o );
}
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1566083&r1=1566082&r2=1566083&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Sat Feb 8 18:28:00 2014
@@ -51,6 +51,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeFilter;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
@@ -570,6 +571,110 @@ public class TestNewPlanFilterRule {
}
+ /**
+ * Test that filter cannot get pushed up over nested Distinct (see PIG-3347)
+ */
+ @Test
+ public void testFilterAfterNestedDistinct() throws Exception {
+ String query = "a = LOAD 'file.txt';" +
+ "a_group = group a by $0;" +
+ "b = foreach a_group { a_distinct = distinct a.$0;generate group, a_distinct;}" +
+ "c = filter b by SIZE(a_distinct) == 1;" +
+ "store c into 'empty';";
+
+ // filter should not be pushed above nested distinct,
+ //ie expect - loload -> locogroup -> foreach -> filter
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+ newLogicalPlan.explain(System.out, "text", true);
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( cogroup instanceof LOCogroup );
+ Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+ }
+
+ /**
+ * Test that filter cannot get pushed up over nested Limit (see PIG-3347)
+ */
+ @Test
+ public void testFilterAfterNestedLimit() throws Exception {
+ String query = "a = LOAD 'file.txt';" +
+ "a_group = group a by $0;" +
+ "b = foreach a_group { a_limit = limit a.$0 5;generate group, a_limit;}" +
+ "c = filter b by SIZE(a_limit) == 1;" +
+ "store c into 'empty';";
+
+ // filter should not be pushed above nested distinct,
+ //ie expect - loload -> locogroup -> foreach -> filter
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+ newLogicalPlan.explain(System.out, "text", true);
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( cogroup instanceof LOCogroup );
+ Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+ }
+
+ /**
+ * Test that filter cannot get pushed up over nested Filter (see PIG-3347)
+ */
+ @Test
+ public void testFilterAfterNestedFilter() throws Exception {
+ String query = "a = LOAD 'file.txt';" +
+ "a_group = group a by $0;" +
+ "b = foreach a_group { a_filter = filter a by $0 == 1;generate group, a_filter;}" +
+ "c = filter b by SIZE(a_filter) == 1;" +
+ "store c into 'empty';";
+
+ // filter should not be pushed above nested distinct,
+ //ie expect - loload -> locogroup -> foreach -> filter
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+ newLogicalPlan.explain(System.out, "text", true);
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( cogroup instanceof LOCogroup );
+ Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+ }
+
+ /**
+ * Test that filter does not get blocked for PushUpFilter/FilterAboveForeach
+ * by an unrelated nested filter (see PIG-3347)
+ */
+ @Test
+ public void testFilterAfterUnrelatedNestedFilter() throws Exception {
+ String query = "a = LOAD 'file.txt' as (a0:int, a1_bag:bag{(X:int)}, a2_bag:bag{(Y:int)});" +
+ "b = foreach a { a1_filter = filter a1_bag by X == 1; generate a0, a1_filter, a2_bag;}" +
+ "c = filter b by SIZE(a2_bag) == 1;" +
+ "store c into 'empty';";
+
+ // filter should be pushed above nested filter,
+ //ie expect - loload -> locogroup -> foreach -> filter
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
+ newLogicalPlan.explain(System.out, "text", true);
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Operator foreach1 = newLogicalPlan.getSuccessors(load).get( 0 );
+ Assert.assertTrue( foreach1 instanceof LOForEach );
+ Operator filter = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
+ Assert.assertTrue( filter instanceof LOFilter );
+ Operator foreach2 = newLogicalPlan.getSuccessors(filter).get( 0 );
+ Assert.assertTrue( foreach2 instanceof LOForEach );
+ }
+
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer(pc);
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
@@ -602,6 +707,11 @@ public class TestNewPlanFilterRule {
r = new PushUpFilter( "PushUpFilter" );
s.add(r);
ls.add(s);
+
+ s = new HashSet<Rule>();
+ r = new FilterAboveForeach( "PushUpFilter" );
+ s.add(r);
+ ls.add(s);
return ls;
}