You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/03/11 17:29:42 UTC
svn commit: r1080642 [1/2] - in /pig/trunk: src/org/apache/pig/
src/org/apache/pig/newplan/logical/
src/org/apache/pig/newplan/logical/expression/
src/org/apache/pig/newplan/logical/relational/
src/org/apache/pig/newplan/logical/visitor/ src/org/apache...
Author: thejas
Date: Fri Mar 11 16:29:42 2011
New Revision: 1080642
URL: http://svn.apache.org/viewvc?rev=1080642&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-23.2.patch - (thejas)
Added:
pig/trunk/test/org/apache/pig/test/TestProjectStarExpander.java
Modified:
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/newplan/logical/Util.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/test/org/apache/pig/test/TestSchema.java
pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Mar 11 16:29:42 2011
@@ -1524,8 +1524,7 @@ public class PigServer {
accuQuery.append( line + "\n" );
}
- String query = accuQuery.toString();
- return query;
+ return accuQuery.toString();
}
private void compile() throws IOException {
@@ -1536,7 +1535,6 @@ public class PigServer {
private void compile(LogicalPlan lp) throws FrontendException {
new ColumnAliasConversionVisitor( lp ).visit();
new SchemaAliasVisitor( lp ).visit();
- new ProjectStarExpander(lp).visit();
new ScalarVisitor( lp, pigContext ).visit();
// TODO: move optimizer here from HExecuteEngine.
Modified: pig/trunk/src/org/apache/pig/newplan/logical/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/Util.java Fri Mar 11 16:29:42 2011
@@ -96,30 +96,13 @@ public class Util {
* it inserts a tuple schema. It does so for all inner levels.
* eg bag({int}) => bag({(int)})
* @param sch
+ * @return modified schema
* @throws FrontendException
*/
- public static void fixSchemaAddTupleInBag(Schema sch) throws FrontendException{
- for(FieldSchema fs : sch.getFields()){
- if(fs.schema != null){
- fixSchemaAddTupleInBag(fs.schema);
- }
- if(fs.type == DataType.BAG){
- // if there is no inner schema, add a empty tuple inner schema
- if(fs.schema == null){
- fs.schema = new Schema(new FieldSchema(null, DataType.TUPLE));
- }else if(
- (fs.schema.size() == 1 && fs.schema.getField(0).type != DataType.TUPLE)
- ||
- fs.schema.size() > 1
- ){
- //the inner schema is something other than tuple schema.
- // change it to a schema with single tuple field, this tuple
- // field will have the old inner schema as its inner schema
- fs.schema = new Schema(new FieldSchema(null, fs.schema, DataType.TUPLE));
- }
- }
-
- }
+ public static Schema fixSchemaAddTupleInBag(Schema sch) throws FrontendException{
+ LogicalSchema logSch = translateSchema(sch);
+ logSch.normalize();
+ return translateSchema(logSch);
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java Fri Mar 11 16:29:42 2011
@@ -102,7 +102,6 @@ public class BinCondExpression extends L
//TypeCheckingExpVisitor will ensure that lhs and rhs have same schema
LogicalFieldSchema argFs = getLhs().getFieldSchema();
fieldSchema = argFs.deepCopy();
- fieldSchema.resetUid();
uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
return fieldSchema;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Fri Mar 11 16:29:42 2011
@@ -174,13 +174,12 @@ public class LOCogroup extends LogicalRe
break;
}
break;
- }
- if(mExpressionPlans.size() > 1){
- //reset the uid, because the group column is associated with more
- // than one input
- groupKeySchema.resetUid();
- }
-
+ }
+ }
+ if(mExpressionPlans.size() > 1){
+ //reset the uid, because the group column is associated with more
+ // than one input
+ groupKeySchema.resetUid();
}
if (groupKeySchema==null) {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Mar 11 16:29:42 2011
@@ -116,6 +116,10 @@ public class LogicalSchema {
return toString(true);
}
+ /**
+ * Give new value for uid if uid of field schema or those in fields schema
+ * of inner schema are -1.
+ */
public void stampFieldSchema() {
if (uid==-1)
uid = LogicalExpression.getNextUid();
@@ -181,7 +185,7 @@ public class LogicalSchema {
* Rest uid of this fieldschema and inner schema
*/
public void resetUid(){
- uid = LogicalExpression.getNextUid();
+ uid = -1;
if(schema != null){
schema.resetUid();
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Fri Mar 11 16:29:42 2011
@@ -17,6 +17,7 @@
*/
package org.apache.pig.newplan.logical.visitor;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -247,22 +248,36 @@ public class LineageFindRelVisitor exten
public void visit(LOCogroup group) throws FrontendException{
mapToPredLoadFunc(group);
List<Operator> inputs = group.getInputs((LogicalPlan)plan);
- if(inputs.size() == 1){
- mapToPredLoadFunc(group);
- }
+
+ // list of field schemas of group plans
+ List<LogicalFieldSchema> groupPlanSchemas = new ArrayList<LogicalFieldSchema>();
MultiMap<Integer, LogicalExpressionPlan> plans = group.getExpressionPlans();
for(LogicalExpressionPlan expPlan : plans.values()){
visitExpression(expPlan);
+ if(expPlan.getSources().size() != 1){
+ throw new AssertionError("Group plans should have only one output");
+ }
+ groupPlanSchemas.add(((LogicalExpression)expPlan.getSources().get(0)).getFieldSchema());
}
+
+ LogicalSchema sch = group.getSchema();
+ //if the group plans are associated with same load function , associate
+ //same load fucntion with group column schema
+ mapMatchLoadFuncToUid(sch.getField(0), groupPlanSchemas);
+
+
+
//set the load func spec for the bags in the schema, this helps if
// the input schemas are not set
- LogicalSchema sch = group.getSchema();
//group schema has a group column followed by bags corresponding to each
// input
if(sch.size() != inputs.size()+1 ){
throw new AssertionError("cogroup schema size not same as number of inputs");
}
+
+
+
for(int i=1; i < sch.size(); i++){
long uid = sch.getField(i).uid;
LogicalRelationalOperator input = (LogicalRelationalOperator) inputs.get(i-1);
@@ -435,6 +450,64 @@ public class LineageFindRelVisitor exten
throw new VisitorException(msg,2262, PigException.BUG) ;
}
}
+
+ /**
+ * if uid in input field schemas or their inner schemas map to same
+ * load function, then map the new uid in bincond also to same
+ * load function in uid2LoadFuncMap
+ * @param outFS
+ * @param inputFieldSchemas
+ * @throws VisitorException
+ */
+ void mapMatchLoadFuncToUid(
+ LogicalFieldSchema outFS,
+ List<LogicalFieldSchema> inputFieldSchemas) throws VisitorException {
+
+
+ if(inputFieldSchemas.size() == 0){
+ return;
+ }
+
+ //if same non null load func is associated with all fieldschemas
+ // asssociate that with the uid of outFS
+ LogicalFieldSchema inpFS1 = inputFieldSchemas.get(0);
+ FuncSpec funcSpec1 = uid2LoadFuncMap.get(inpFS1.uid);
+ boolean allInnerSchemaMatch = false;
+ if(funcSpec1 != null){
+ boolean allMatch = true;
+ allInnerSchemaMatch = true;
+
+ for(LogicalFieldSchema fs : inputFieldSchemas){
+ //check if all func spec match
+ if(!funcSpec1.equals(uid2LoadFuncMap.get(fs.uid))){
+ allMatch = false;
+ break;
+ }
+ //check if all inner schema match for use later
+ if(outFS.schema == null || !outFS.schema.isEqual(fs.schema)){
+ allInnerSchemaMatch = false;
+ }
+ }
+ if(allMatch){
+ addUidLoadFuncToMap(outFS.uid, funcSpec1);
+ }
+ }
+
+ //recursively call the function for corresponding files in inner schemas
+ if(allInnerSchemaMatch){
+ List<LogicalFieldSchema> outFields = outFS.schema.getFields();
+ for(int i=0; i<outFields.size(); i++){
+ List<LogicalFieldSchema> inFsList = new ArrayList<LogicalFieldSchema>();
+ for(LogicalFieldSchema fs : inputFieldSchemas){
+ inFsList.add(fs.schema.getField(i));
+ }
+ mapMatchLoadFuncToUid(outFields.get(i), inFsList);
+ }
+ }
+
+ }
+
+
/**
* If a input of dereference or map-lookup has associated load function,
@@ -506,10 +579,12 @@ public class LineageFindRelVisitor exten
uid2LoadFuncMap.put(binCond.getFieldSchema().uid, funcSpec);
}
else {
+ List<LogicalFieldSchema> inFieldSchemas = new ArrayList<LogicalFieldSchema>();
+ inFieldSchemas.add(lhs.getFieldSchema());
+ inFieldSchemas.add(rhs.getFieldSchema());
mapMatchLoadFuncToUid(
binCond.getFieldSchema(),
- lhs.getFieldSchema(),
- rhs.getFieldSchema()
+ inFieldSchemas
);
}
}
@@ -589,45 +664,7 @@ public class LineageFindRelVisitor exten
return null;
}
- /**
- * if uid in lhs and rhs fieldschema or inner schemas map to same
- * load function, then map the new uid in bincond also to same
- * load function in uid2LoadFuncMap
- * @param binCFs
- * @param fieldSchema2
- * @param fieldSchema3
- */
- private void mapMatchLoadFuncToUid(
- LogicalFieldSchema binCFs,
- LogicalFieldSchema lhsFs,
- LogicalFieldSchema rhsFs) {
-
- FuncSpec lhsFuncSpec = uid2LoadFuncMap.get(lhsFs.uid);
- FuncSpec rhsFuncSpec = uid2LoadFuncMap.get(lhsFs.uid);
-
- if(lhsFuncSpec == null || rhsFuncSpec == null){
- return;
- }
-
- if(lhsFuncSpec.equals(rhsFuncSpec)){
- uid2LoadFuncMap.put(binCFs.uid, lhsFuncSpec);
- }
-
- if(binCFs.schema != null){
- //then rhsFs.schema, lhsFs.schema has to be non-null as well
- //and of same size
- for(int i=0; i<binCFs.schema.size(); i++){
- mapMatchLoadFuncToUid(
- binCFs.schema.getField(i),
- lhsFs.schema.getField(i),
- rhsFs.schema.getField(i)
- );
- }
- }
-
- }
-
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java Fri Mar 11 16:29:42 2011
@@ -31,7 +31,6 @@ import org.apache.pig.newplan.DepthFirst
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
@@ -45,6 +44,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import com.google.common.primitives.Booleans;
@@ -250,14 +250,10 @@ public class ProjectStarExpander extends
){
//there is a project-star to be expanded
+ LogicalSchema userStarSch = null;
//user schema for * is not supported (yet)
if(userSchema != null && userSchema.get(i) != null){
- String msg = "Schema not expected for project-star";
- throw new FrontendException(
- msg,
- 2265,
- PigException.BUG
- );
+ userStarSch = userSchema.get(i);
}
//replacing the existing project star with new ones
@@ -287,8 +283,22 @@ public class ProjectStarExpander extends
proj2InpRel.put(newProj, newInLoad);
newFlattens.add(flattens[i]);
- if(newUserSchema != null)
- newUserSchema.add(null);
+ if(newUserSchema != null ){
+
+ if(userStarSch != null
+ && userStarSch.getFields().size() > j
+ && userStarSch.getField(j) != null){
+
+ //if the project-star field has user specified schema, use the
+ // j'th field for this column
+ LogicalSchema sch = new LogicalSchema();
+ sch.addField(new LogicalFieldSchema(userStarSch.getField(j)));
+ newUserSchema.add(sch);
+ }
+ else{
+ newUserSchema.add(null);
+ }
+ }
}
}else{ //no project-star
@@ -336,6 +346,9 @@ public class ProjectStarExpander extends
gen.setFlattenFlags(Booleans.toArray(newFlattens));
gen.setUserDefinedSchema(newUserSchema);
+ gen.resetSchema();
+ foreach.resetSchema();
+
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/SchemaAliasVisitor.java Fri Mar 11 16:29:42 2011
@@ -18,6 +18,9 @@
package org.apache.pig.newplan.logical.visitor;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.PlanValidationException;
@@ -55,20 +58,22 @@ public class SchemaAliasVisitor extends
*/
protected void validate(LogicalRelationalOperator op) throws FrontendException {
LogicalSchema schema = op.getSchema();
- if( schema != null ) {
- for( int i = 0; i < schema.size(); i++ ) {
- for( int j = i + 1; j < schema.size(); j++ ) {
- if( schema.getField( i ) != null &&
- schema.getField( j ) != null &&
- schema.getField( i ).alias != null &&
- schema.getField( j ).alias != null &&
- schema.getField( i ).alias.equals( schema.getField( j ).alias ) ) {
+
+ Set<String> seenAliases = new HashSet<String>();
+ if( schema != null){
+ for( int i = 0; i < schema.size(); i++){
+ if( schema.getField(i) != null &&
+ schema.getField(i).alias != null
+ ){
+ String alias = schema.getField(i).alias;
+ if(seenAliases.contains(alias)){
int errCode = 1108;
String msg = "Duplicate schema alias: " + schema.getField( i ).alias;
if( op.getAlias() != null )
msg = msg + " in \"" + op.getAlias() + "\"";
throw new PlanValidationException( msg, errCode, PigException.INPUT );
}
+ seenAliases.add(alias);
}
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Mar 11 16:29:42 2011
@@ -1061,9 +1061,6 @@ public class TypeCheckingExpVisitor exte
public static boolean schemaEqualsForMatching(Schema inputSchema,
Schema udfSchema, boolean ignoreByteArrays) throws FrontendException {
- // the old udf schemas might not have tuple inside bag
- // fix that!
- Util.fixSchemaAddTupleInBag(udfSchema);
// If both of them are null, they are equal
if ((inputSchema == null) && (udfSchema == null)) {
@@ -1078,7 +1075,12 @@ public class TypeCheckingExpVisitor exte
if (udfSchema == null) {
return false;
}
-
+
+ // the old udf schemas might not have tuple inside bag
+ // fix that!
+ udfSchema = Util.fixSchemaAddTupleInBag(udfSchema);
+
+
if (inputSchema.size() != udfSchema.size())
return false;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Fri Mar 11 16:29:42 2011
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
@@ -360,11 +359,7 @@ public class TypeCheckingRelVisitor exte
// Manipulate the plan structure
plan.add(foreach);
- plan.disconnect(fromOp, toOp) ;
-
- plan.connect(fromOp, foreach);
- plan.connect(foreach, toOp);
-
+ plan.insertBetween(fromOp, foreach, toOp);
return foreach;
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Mar 11 16:29:42 2011
@@ -83,6 +83,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
@@ -201,7 +202,7 @@ public class LogicalPlanBuilder {
}
String buildSortOp(LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
- List<Boolean> ascFlags, FuncSpec fs) {
+ List<Boolean> ascFlags, FuncSpec fs) throws ParserValidationException {
sort.setSortColPlans( plans );
sort.setUserFunc( fs );
if (ascFlags.isEmpty()) {
@@ -209,7 +210,13 @@ public class LogicalPlanBuilder {
ascFlags.add(true);
}
sort.setAscendingCols( ascFlags );
- return buildOp( sort, alias, inputAlias, null );
+ alias = buildOp( sort, alias, inputAlias, null );
+ try {
+ (new ProjectStarExpander(sort.getPlan())).visit(sort);
+ } catch (FrontendException e) {
+ throw new ParserValidationException(intStream, e);
+ }
+ return alias;
}
LOJoin createJoinOp() {
@@ -218,7 +225,8 @@ public class LogicalPlanBuilder {
String buildJoinOp(LOJoin op, String alias, List<String> inputAliases,
MultiMap<Integer, LogicalExpressionPlan> joinPlans,
- JOINTYPE jt, List<Boolean> innerFlags, String partitioner) {
+ JOINTYPE jt, List<Boolean> innerFlags, String partitioner)
+ throws ParserValidationException {
if (jt==null)
jt = JOINTYPE.HASH;
else {
@@ -238,7 +246,13 @@ public class LogicalPlanBuilder {
op.setJoinType( jt );
op.setInnerFlags( flags );
op.setJoinPlans( joinPlans );
- return buildOp( op, alias, inputAliases, partitioner );
+ alias = buildOp( op, alias, inputAliases, partitioner );
+ try {
+ (new ProjectStarExpander(op.getPlan())).visit(op);
+ } catch (FrontendException e) {
+ throw new ParserValidationException(intStream, e);
+ }
+ return alias;
}
LOCogroup createGroupOp() {
@@ -267,7 +281,13 @@ public class LogicalPlanBuilder {
op.setExpressionPlans( expressionPlans );
op.setGroupType( gt );
op.setInnerFlags( flags );
- return buildOp( op, alias, inputAliases, partitioner );
+ alias = buildOp( op, alias, inputAliases, partitioner );
+ try {
+ (new ProjectStarExpander(op.getPlan())).visit(op);
+ } catch (FrontendException e) {
+ throw new ParserValidationException(intStream, e);
+ }
+ return alias;
}
private String getAbolutePathForLoad(String filename, FuncSpec funcSpec)
@@ -365,9 +385,16 @@ public class LogicalPlanBuilder {
return new LOForEach( plan );
}
- String buildForeachOp(LOForEach op, String alias, String inputAlias, LogicalPlan innerPlan) {
+ String buildForeachOp(LOForEach op, String alias, String inputAlias, LogicalPlan innerPlan)
+ throws ParserValidationException {
op.setInnerPlan( innerPlan );
- return buildOp( op, alias, inputAlias, null );
+ alias = buildOp( op, alias, inputAlias, null );
+ try {
+ (new ProjectStarExpander(op.getPlan())).visit(op);
+ } catch (FrontendException e) {
+ throw new ParserValidationException(intStream, e);
+ }
+ return alias;
}
LOGenerate createGenerateOp(LogicalPlan plan) {
Added: pig/trunk/test/org/apache/pig/test/TestProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectStarExpander.java?rev=1080642&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectStarExpander.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestProjectStarExpander.java Fri Mar 11 16:29:42 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+
+/**
+ * Test project of multiple fields
+ */
+public class TestProjectStarExpander {
+ private static final String INP_FILE_5FIELDS = "TestProjectStarExpander1";
+
+ @Before
+ public void setUp() throws Exception {
+ FileLocalizer.setInitialized(false);
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @BeforeClass
+ public static void oneTimeSetup() throws IOException, Exception {
+ // first input file
+ PrintWriter w = new PrintWriter(new FileWriter(INP_FILE_5FIELDS));
+ w.println("10\t20\t30\t40\t50");
+ w.println("11\t21\t31\t41\t51");
+ w.close();
+
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+
+ new File(INP_FILE_5FIELDS).delete();
+
+ }
+
+
+
+ @Test
+ public void testProjectStarForeach() throws IOException, ParseException {
+ PigServer pig = new PigServer(ExecType.LOCAL);
+
+ //specifying the new aliases only for initial set of fields
+ String query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
+ + "f = foreach l1 generate * as (aa, bb, cc);"
+ ;
+
+ Util.registerMultiLineQuery(pig, query);
+
+ Schema expectedSch = Util.getSchemaFromString("aa : int, bb : int, cc : int, d : int, e : int");
+ Schema sch = pig.dumpSchema("f");
+ assertEquals("Checking expected schema", expectedSch, sch);
+
+ //specifying aliases for all fields
+ query =
+ " l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
+ + "f = foreach l1 generate * as (aa, bb, cc, dd, ee);"
+ ;
+ Util.registerMultiLineQuery(pig, query);
+
+ expectedSch = Util.getSchemaFromString("aa : int, bb : int, cc : int, dd : int, ee : int");
+ sch = pig.dumpSchema("f");
+ assertEquals("Checking expected schema", expectedSch, sch);
+ Iterator<Tuple> it = pig.openIterator("f");
+
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(10,20,30,40,50)",
+ "(11,21,31,41,51)",
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+ }
+
+
+}
Modified: pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=1080642&r1=1080641&r2=1080642&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSchema.java Fri Mar 11 16:29:42 2011
@@ -646,22 +646,22 @@ public class TestSchema extends TestCase
}
}
-// public void testSchemaSerialization() throws IOException {
-// MiniCluster cluster = MiniCluster.buildCluster();
-// PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-// String inputFileName = "testSchemaSerialization-input.txt";
-// String[] inputData = new String[] { "foo\t1", "hello\t2" };
-// Util.createInputFile(cluster, inputFileName, inputData);
-// String script = "a = load '"+ inputFileName +"' as (f1:chararray, f2:int);" +
-// " b = group a all; c = foreach b generate org.apache.pig.test.InputSchemaUDF(a);";
-// Util.registerMultiLineQuery(pigServer, script);
-// Iterator<Tuple> it = pigServer.openIterator("c");
-// while(it.hasNext()) {
-// Tuple t = it.next();
-// assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
-// }
-// cluster.shutDown();
-// }
+ public void testSchemaSerialization() throws IOException {
+ MiniCluster cluster = MiniCluster.buildCluster();
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String inputFileName = "testSchemaSerialization-input.txt";
+ String[] inputData = new String[] { "foo\t1", "hello\t2" };
+ Util.createInputFile(cluster, inputFileName, inputData);
+ String script = "a = load '"+ inputFileName +"' as (f1:chararray, f2:int);" +
+ " b = group a all; c = foreach b generate org.apache.pig.test.InputSchemaUDF(a);";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ assertEquals("{a: {(f1: chararray,f2: int)}}", t.get(0));
+ }
+ cluster.shutDown();
+ }
@Test
// See PIG-730