You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/11/01 23:55:28 UTC
svn commit: r1029878 - in /pig/branches/branch-0.8: CHANGES.txt
src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Author: daijy
Date: Mon Nov 1 22:55:27 2010
New Revision: 1029878
URL: http://svn.apache.org/viewvc?rev=1029878&view=rev
Log:
PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Mon Nov 1 22:55:27 2010
@@ -204,6 +204,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema (daijy)
+
PIG-1705: New logical plan: self-join fail for some queries (daijy)
PIG-1704: Output Compression is not at work if the output path is absolute and there is a trailing / afte the compression suffix (yanz)
Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java Mon Nov 1 22:55:27 2010
@@ -19,8 +19,10 @@ package org.apache.pig.newplan.logical.r
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -33,6 +35,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
@@ -230,6 +233,7 @@ public class PushDownForEachFlatten exte
} else if( next instanceof LOCross || next instanceof LOJoin ) {
List<Operator> preds = currentPlan.getPredecessors( next );
List<Integer> fieldsToBeFlattaned = new ArrayList<Integer>();
+ Map<Integer, LogicalSchema> cachedUserDefinedSchema = new HashMap<Integer, LogicalSchema>();
boolean[] flags = null;
int fieldCount = 0;
for( Operator op : preds ) {
@@ -238,7 +242,12 @@ public class PushDownForEachFlatten exte
flags = gen.getFlattenFlags();
for( int i = 0; i < flags.length; i++ ) {
if( flags[i] ) {
- fieldsToBeFlattaned.add( fieldCount++ );
+ fieldsToBeFlattaned.add(fieldCount);
+ if (gen.getUserDefinedSchema()!=null && gen.getUserDefinedSchema().get(i)!=null) {
+ cachedUserDefinedSchema.put(fieldCount, gen.getUserDefinedSchema().get(i));
+ gen.getUserDefinedSchema().set(i, null);
+ }
+ fieldCount++;
} else {
fieldCount++;
}
@@ -248,9 +257,19 @@ public class PushDownForEachFlatten exte
}
}
+
boolean[] flattenFlags = new boolean[fieldCount];
+ List<LogicalSchema> mUserDefinedSchema = null;
+ if (cachedUserDefinedSchema!=null) {
+ mUserDefinedSchema = new ArrayList<LogicalSchema>();
+ for (int i=0;i<fieldCount;i++)
+ mUserDefinedSchema.add(null);
+ }
for( Integer i : fieldsToBeFlattaned ) {
flattenFlags[i] = true;
+ if (cachedUserDefinedSchema.containsKey(i)) {
+ mUserDefinedSchema.set(i, cachedUserDefinedSchema.get(i));
+ }
}
// Now create a new foreach after cross/join and insert it into the plan.
@@ -258,6 +277,8 @@ public class PushDownForEachFlatten exte
LogicalPlan innerPlan = new LogicalPlan();
List<LogicalExpressionPlan> exprs = new ArrayList<LogicalExpressionPlan>( fieldCount );
LOGenerate gen = new LOGenerate( innerPlan, exprs, flattenFlags );
+ if (mUserDefinedSchema!=null)
+ gen.setUserDefinedSchema(mUserDefinedSchema);
innerPlan.add( gen );
newForeach.setInnerPlan( innerPlan );
for( int i = 0; i < fieldCount; i++ ) {
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1029878&r1=1029877&r2=1029878&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Mon Nov 1 22:55:27 2010
@@ -1024,6 +1024,21 @@ public class TestNewPlanPushDownForeachF
Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
+
+ // See PIG-1706
+ @Test
+ public void testForeachWithUserDefinedSchema() throws Exception {
+ planTester.buildPlan("a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});");
+ planTester.buildPlan("b = load '2.txt' as (b0:int, b1);");
+ planTester.buildPlan("c = foreach a generate a0, flatten(a2) as (q1, q2);");
+ org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("d = join c by a0, b by b0;");
+
+ LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+
+ LOForEach foreach = (LOForEach)newLogicalPlan.getSinks().get( 0 );
+ Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
+ Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
+ }
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {