You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/11/01 23:55:28 UTC

svn commit: r1029878 - in /pig/branches/branch-0.8: CHANGES.txt src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java

Author: daijy
Date: Mon Nov  1 22:55:27 2010
New Revision: 1029878

URL: http://svn.apache.org/viewvc?rev=1029878&view=rev
Log:
PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Mon Nov  1 22:55:27 2010
@@ -204,6 +204,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema (daijy)
+
 PIG-1705: New logical plan: self-join fail for some queries (daijy)
 
 PIG-1704: Output Compression is not at work if the output path is absolute and there is a trailing / afte the compression suffix (yanz)

Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Mon Nov  1 22:55:27 2010
@@ -19,8 +19,10 @@ package org.apache.pig.newplan.logical.r
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -33,6 +35,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
@@ -230,6 +233,7 @@ public class PushDownForEachFlatten exte
             } else if( next instanceof LOCross || next instanceof LOJoin ) {
                 List<Operator> preds = currentPlan.getPredecessors( next );
                 List<Integer> fieldsToBeFlattaned = new ArrayList<Integer>();
+                Map<Integer, LogicalSchema> cachedUserDefinedSchema = new HashMap<Integer, LogicalSchema>();
                 boolean[] flags = null;
                 int fieldCount = 0;
                 for( Operator op : preds ) {
@@ -238,7 +242,12 @@ public class PushDownForEachFlatten exte
                         flags = gen.getFlattenFlags();
                         for( int i = 0; i < flags.length; i++ ) {
                             if( flags[i] ) {
-                                fieldsToBeFlattaned.add( fieldCount++ );
+                                fieldsToBeFlattaned.add(fieldCount);
+                                if (gen.getUserDefinedSchema()!=null && gen.getUserDefinedSchema().get(i)!=null) {
+                                    cachedUserDefinedSchema.put(fieldCount, gen.getUserDefinedSchema().get(i));
+                                    gen.getUserDefinedSchema().set(i, null);
+                                }
+                                fieldCount++;
                             } else {
                                 fieldCount++;
                             }
@@ -248,9 +257,19 @@ public class PushDownForEachFlatten exte
                     }
                 }
                 
+                
                 boolean[] flattenFlags = new boolean[fieldCount];
+                List<LogicalSchema> mUserDefinedSchema = null;
+                if (cachedUserDefinedSchema!=null) {
+                    mUserDefinedSchema = new ArrayList<LogicalSchema>();
+                    for (int i=0;i<fieldCount;i++)
+                        mUserDefinedSchema.add(null);
+                }
                 for( Integer i : fieldsToBeFlattaned ) {
                     flattenFlags[i] = true;
+                    if (cachedUserDefinedSchema.containsKey(i)) {
+                        mUserDefinedSchema.set(i, cachedUserDefinedSchema.get(i));
+                    }
                 }
                 
                 // Now create a new foreach after cross/join and insert it into the plan.
@@ -258,6 +277,8 @@ public class PushDownForEachFlatten exte
                 LogicalPlan innerPlan = new LogicalPlan();
                 List<LogicalExpressionPlan> exprs = new ArrayList<LogicalExpressionPlan>( fieldCount );
                 LOGenerate gen = new LOGenerate( innerPlan, exprs, flattenFlags );
+                if (mUserDefinedSchema!=null)
+                    gen.setUserDefinedSchema(mUserDefinedSchema);
                 innerPlan.add( gen );
                 newForeach.setInnerPlan( innerPlan );
                 for( int i = 0; i < fieldCount; i++ ) {

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Mon Nov  1 22:55:27 2010
@@ -1024,6 +1024,21 @@ public class TestNewPlanPushDownForeachF
         Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
         Assert.assertTrue( sort instanceof LOSort );
     }
+    
+    // See PIG-1706
+    @Test
+    public void testForeachWithUserDefinedSchema() throws Exception {
+        planTester.buildPlan("a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});");
+        planTester.buildPlan("b = load '2.txt' as (b0:int, b1);");
+        planTester.buildPlan("c = foreach a generate a0, flatten(a2) as (q1, q2);");
+        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("d = join c by a0, b by b0;");
+        
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        
+        LOForEach foreach = (LOForEach)newLogicalPlan.getSinks().get( 0 );
+        Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
+        Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
+    }
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {