You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/08/07 19:52:40 UTC
svn commit: r1370398 - in /pig/branches/branch-0.9: CHANGES.txt
src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Author: daijy
Date: Tue Aug 7 17:52:40 2012
New Revision: 1370398
URL: http://svn.apache.org/viewvc?rev=1370398&view=rev
Log:
PIG-2721: Wrong output generated while loading bags as input
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Aug 7 17:52:40 2012
@@ -30,6 +30,8 @@ PIG-2619: HBaseStorage constructs a Scan
BUG FIXES
+PIG-2721: Wrong output generated while loading bags as input (knoguchi via daijy)
+
PIG-2761: With hadoop23 importing modules inside python script does not work (rohini via daijy)
PIG-2775: Register jar does not goes to classpath in some cases (daijy)
Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Tue Aug 7 17:52:40 2012
@@ -237,6 +237,8 @@ public class PushDownForEachFlatten exte
} else {
currentPlan.connect( next, foreach );
}
+ subPlan.add(foreach);
+ subPlan.add(next);
} else if( next instanceof LOCross || next instanceof LOJoin ) {
List<Operator> preds = currentPlan.getPredecessors( next );
List<Integer> fieldsToBeFlattaned = new ArrayList<Integer>();
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Tue Aug 7 17:52:40 2012
@@ -35,6 +35,8 @@ import org.apache.pig.newplan.OperatorPl
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -42,6 +44,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.OptimizerUtils;
import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
@@ -1121,6 +1124,62 @@ public class TestNewPlanPushDownForeachF
Assert.assertTrue(op instanceof LOJoin);
}
+ // See PIG-2721
+ @Test
+ public void testForeachSortWithUserDefinedSchema() throws Exception {
+ String query =
+ "a = load '1.txt' as (a0:int, a1:bag{t:(i1:int, i2:int)});" +
+ "b = foreach a generate a0, flatten(a1) as (q1, q2);" +
+ "c = order b by a0;" +
+ "store c into 'output';";
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlanWithPruning( query );
+
+ Operator load = newLogicalPlan.getSources().get( 0 );
+ Assert.assertTrue( load instanceof LOLoad );
+ Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" +
+ "even though it should be stored",
+ ((LOLoad)load).getSchema().getField("a1") != null );
+ }
+
+ public class MyPlanOptimizerWithPruning extends LogicalPlanOptimizer {
+ protected MyPlanOptimizerWithPruning (OperatorPlan p, int iterations) {
+ super(p, iterations, new HashSet<String>());
+ addPlanTransformListener(new SchemaPatcher());
+ addPlanTransformListener(new ProjectionPatcher());
+ }
+
+ protected List<Set<Rule>> buildRuleSets() {
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ Set<Rule> s = new HashSet<Rule>();
+ // add split filter rule
+ Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
+ s.add(r);
+ ls.add(s);
+
+ s = new HashSet<Rule>();
+ r = new PushDownForEachFlatten( "PushDownForEachFlatten" );
+ s.add(r);
+ ls.add(s);
+
+ s = new HashSet<Rule>();
+ r = new ColumnMapKeyPrune( "ColumnMapKeyPrune" );
+ s.add(r);
+ ls.add(s);
+
+ return ls;
+ }
+ }
+
+ private LogicalPlan migrateAndOptimizePlanWithPruning(String query) throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ PlanOptimizer optimizer = new MyPlanOptimizerWithPruning( newLogicalPlan, 3 );
+ optimizer.optimize();
+ return newLogicalPlan;
+ }
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, new HashSet<String>());