You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/08/07 19:52:40 UTC

svn commit: r1370398 - in /pig/branches/branch-0.9: CHANGES.txt src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java

Author: daijy
Date: Tue Aug  7 17:52:40 2012
New Revision: 1370398

URL: http://svn.apache.org/viewvc?rev=1370398&view=rev
Log:
PIG-2721: Wrong output generated while loading bags as input

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
    pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Aug  7 17:52:40 2012
@@ -30,6 +30,8 @@ PIG-2619: HBaseStorage constructs a Scan
 
 BUG FIXES
 
+PIG-2721: Wrong output generated while loading bags as input (knoguchi via daijy)
+
 PIG-2761: With hadoop23 importing modules inside python script does not work (rohini via daijy)
 
 PIG-2775: Register jar does not goes to classpath in some cases (daijy)

Modified: pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Tue Aug  7 17:52:40 2012
@@ -237,6 +237,8 @@ public class PushDownForEachFlatten exte
                 } else {
                     currentPlan.connect( next, foreach );
                 }
+                subPlan.add(foreach);
+                subPlan.add(next);
             } else if( next instanceof LOCross || next instanceof LOJoin ) {
                 List<Operator> preds = currentPlan.getPredecessors( next );
                 List<Integer> fieldsToBeFlattaned = new ArrayList<Integer>();

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1370398&r1=1370397&r2=1370398&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Tue Aug  7 17:52:40 2012
@@ -35,6 +35,8 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -42,6 +44,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.OptimizerUtils;
 import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
@@ -1121,6 +1124,62 @@ public class TestNewPlanPushDownForeachF
         Assert.assertTrue(op instanceof LOJoin);
     }
 
+    // See PIG-2721
+    @Test
+    public void testForeachSortWithUserDefinedSchema() throws Exception {
+        String query =
+        "a = load '1.txt' as (a0:int, a1:bag{t:(i1:int, i2:int)});" +
+        "b = foreach a generate a0, flatten(a1) as (q1, q2);" +
+        "c = order b by a0;" +
+        "store c into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlanWithPruning( query );
+
+        Operator load = newLogicalPlan.getSources().get( 0 );
+        Assert.assertTrue( load instanceof LOLoad );
+        Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" + 
+                  "even though it should be stored",
+                  ((LOLoad)load).getSchema().getField("a1") != null );
+    }
+
+    public class MyPlanOptimizerWithPruning extends LogicalPlanOptimizer {
+        protected MyPlanOptimizerWithPruning (OperatorPlan p,  int iterations) {
+            super(p, iterations, new HashSet<String>());
+            addPlanTransformListener(new SchemaPatcher());
+            addPlanTransformListener(new ProjectionPatcher());
+        }
+
+        protected List<Set<Rule>> buildRuleSets() {
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+            Set<Rule> s = new HashSet<Rule>();
+            // add split filter rule
+            Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
+            s.add(r);
+            ls.add(s);
+
+            s = new HashSet<Rule>();
+            r = new PushDownForEachFlatten( "PushDownForEachFlatten" );
+            s.add(r);
+            ls.add(s);
+
+            s = new HashSet<Rule>();
+            r = new ColumnMapKeyPrune( "ColumnMapKeyPrune" );
+            s.add(r);
+            ls.add(s);
+
+            return ls;
+        }
+    }
+
+    private LogicalPlan migrateAndOptimizePlanWithPruning(String query) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        PlanOptimizer optimizer = new MyPlanOptimizerWithPruning( newLogicalPlan, 3 );
+        optimizer.optimize();
+        return newLogicalPlan;
+    }
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super(p, iterations, new HashSet<String>());