You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/11/15 00:38:54 UTC

svn commit: r1409562 - in /pig/trunk: ./ src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/parser/ test/org/apache/pig/parser/

Author: jcoveney
Date: Wed Nov 14 23:38:53 2012
New Revision: 1409562

URL: http://svn.apache.org/viewvc?rev=1409562&view=rev
Log:
PIG-2937: generated field in nested foreach does not inherit the variable name as the field name (jcoveney)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Nov 14 23:38:53 2012
@@ -76,6 +76,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2937: generated field in nested foreach does not inherit the variable name as the field name (jcoveney)
+
 PIG-3019: Need a target in build.xml for source releases (gates)
 
 PIG-2832: org.apache.pig.pigunit.pig.PigServer does not initialize udf.import.list of PigContext (prkommireddi via rohini)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java Wed Nov 14 23:38:53 2012
@@ -29,7 +29,7 @@ import org.apache.pig.parser.SourceLocat
 public class BinCondExpression extends LogicalExpression {
 
     /**
-     * Will add this operator to the plan and connect it to the 
+     * Will add this operator to the plan and connect it to the
      * left and right hand side operators and the condition operator
      * @param plan plan this operator is part of
      * @param lhs expression on its left hand side
@@ -45,7 +45,7 @@ public class BinCondExpression extends L
         plan.connect(this, lhs);
         plan.connect(this, rhs);
     }
-    
+
     /**
      * Returns the operator which handles this condition
      * @return expression which handles the condition
@@ -58,16 +58,16 @@ public class BinCondExpression extends L
     /**
      * Get the left hand side of this expression.
      * @return expression on the left hand side
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public LogicalExpression getLhs() throws FrontendException {
-        return (LogicalExpression)plan.getSuccessors(this).get(1);        
+        return (LogicalExpression)plan.getSuccessors(this).get(1);
     }
 
     /**
      * Get the right hand side of this expression.
      * @return expression on the right hand side
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public LogicalExpression getRhs() throws FrontendException {
         return (LogicalExpression)plan.getSuccessors(this).get(2);
@@ -83,32 +83,32 @@ public class BinCondExpression extends L
         }
         ((LogicalExpressionVisitor)v).visit(this);
     }
-    
+
     @Override
     public boolean isEqual(Operator other) throws FrontendException {
         if (other != null && other instanceof BinCondExpression) {
             BinCondExpression ao = (BinCondExpression)other;
-            return ao.getCondition().isEqual(getCondition()) && 
+            return ao.getCondition().isEqual(getCondition()) &&
             ao.getLhs().isEqual(getLhs()) && ao.getRhs().isEqual(getRhs());
         } else {
             return false;
         }
     }
-    
+
     @Override
     public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
         if (fieldSchema!=null)
             return fieldSchema;
-        
+
         //TypeCheckingExpVisitor will ensure that lhs and rhs have same schema
         LogicalFieldSchema argFs = getLhs().getFieldSchema();
         fieldSchema = argFs.deepCopy();
         fieldSchema.resetUid();
-        
+
         uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
         return fieldSchema;
     }
-  
+
     @Override
     public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException {
         LogicalExpression copy = new BinCondExpression(

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java Wed Nov 14 23:38:53 2012
@@ -27,11 +27,11 @@ import org.apache.pig.newplan.logical.re
 
 /**
  * Logical representation of expression operators.  Expression operators have
- * a data type and a uid.  Uid is a unique id for each expression. 
+ * a data type and a uid.  Uid is a unique id for each expression.
  *
  */
 public abstract class LogicalExpression extends Operator {
-    
+
     static long nextUid = 1;
     protected LogicalSchema.LogicalFieldSchema fieldSchema;
     protected LogicalSchema.LogicalFieldSchema uidOnlyFieldSchema;
@@ -45,14 +45,22 @@ public abstract class LogicalExpression 
         nextUid = 1;
     }
     /**
-     * 
+     *
      * @param name of the operator
      * @param plan LogicalExpressionPlan this is part of
      */
     public LogicalExpression(String name, OperatorPlan plan) {
         super(name, plan);
     }
-    
+
+    /**
+     * This is a convenience method to avoid the side-effectful nature of getFieldSchema().
+     * It simply returns whether or not fieldSchema is currently null.
+     */
+    public boolean hasFieldSchema() {
+        return fieldSchema != null;
+    }
+
     /**
      * Get the field schema for the output of this expression operator.  This does
      * not merely return the field schema variable.  If schema is not yet set, this
@@ -62,11 +70,11 @@ public abstract class LogicalExpression 
      * @throws FrontendException
      */
     abstract public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException;
-    
+
     public void resetFieldSchema() {
         fieldSchema = null;
     }
-    
+
     /**
      * Get the data type for this expression.
      * @return data type, one of the static bytes of DataType
@@ -76,7 +84,7 @@ public abstract class LogicalExpression 
             return getFieldSchema().type;
         return DataType.BYTEARRAY;
     }
-    
+
     public String toString() {
         StringBuilder msg = new StringBuilder();
         msg.append("(Name: " + name + " Type: ");
@@ -93,7 +101,7 @@ public abstract class LogicalExpression 
 
         return msg.toString();
     }
-    
+
     public void neverUseForRealSetFieldSchema(LogicalFieldSchema fs) throws FrontendException {
         fieldSchema = fs;
         uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
@@ -118,4 +126,4 @@ public abstract class LogicalExpression 
     public void resetUid() {
         uidOnlyFieldSchema = null;
     }
-}
+}
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Wed Nov 14 23:38:53 2012
@@ -42,7 +42,7 @@ import org.apache.pig.parser.SourceLocat
  *
  */
 public class ProjectExpression extends ColumnExpression {
-    
+
     private int input; // Which input of the relational operator this project
                        // is projecting from.  Count is zero based.  So if this
                        // project is in a filter the input number will always
@@ -52,20 +52,20 @@ public class ProjectExpression extends C
     private int col; // The column in the input which the project references.
                      // Count is zero based.
     private String alias; // The alias of the projected field.
-    
+
     private LogicalRelationalOperator attachedRelationalOp;
 
-    //fields for range projection. 
+    //fields for range projection.
     private boolean isRangeProject = false;
     //start and end columns in range. endCol value of -1 represents everything upto end
     private int startCol = -1;
     private int endCol = -2;
-    
+
     private String startAlias;
     private String endAlias;
 
-    
-    
+
+
     /**
      * Adds projection to the plan.
      * @param plan LogicalExpressionPlan this projection will be a part of
@@ -89,7 +89,7 @@ public class ProjectExpression extends C
      * @param inputNum
      * @param alias
      * @param attachedRelationalOp
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public ProjectExpression(OperatorPlan plan, int inputNum, String alias,
             LogicalRelationalOperator attachedRelationalOp) {
@@ -103,7 +103,7 @@ public class ProjectExpression extends C
     /**
      * Constructor for range projection
      * Adds projection to the plan.
-     * The start and end alias/column-number should be set separately. 
+     * The start and end alias/column-number should be set separately.
      * @param plan
      * @param inputNum
      * @param attachedRelationalOp
@@ -133,7 +133,7 @@ public class ProjectExpression extends C
         this.startAlias = projExpr.startAlias;
         this.endAlias = projExpr.endAlias;
         plan.add(this);
-        
+
     }
 
     /**
@@ -155,7 +155,7 @@ public class ProjectExpression extends C
                 "range projection (..) " + startCol;
                 throw new PlanValidationException(this, msg, 2270, PigException.BUG);
             }
-            
+
             if(endCol > 0 && startCol > endCol){
                 String msg = "start column appears after end column in " +
                 "range projection (..) . Start column position " + startCol +
@@ -166,13 +166,13 @@ public class ProjectExpression extends C
             setColNum(findColNum(alias));
         }
     }
-    
+
     private int findColNum(String alias) throws FrontendException {
         LogicalPlan lp = (LogicalPlan)attachedRelationalOp.getPlan();
         List<Operator> inputs = lp.getPredecessors( attachedRelationalOp );
         LogicalRelationalOperator input = (LogicalRelationalOperator)inputs.get( getInputNum() );
         LogicalSchema inputSchema = input.getSchema();
-        
+
         if( alias != null ) {
             int colNum = inputSchema == null ? -1 : inputSchema.getFieldPosition( alias );
             if( colNum == -1 ) {
@@ -187,15 +187,15 @@ public class ProjectExpression extends C
             int col = getColNum();
             if( inputSchema != null && col >= inputSchema.size() ) {
                 throw new PlanValidationException( this,
-                        "Out of bound access. Trying to access non-existent column: " + 
-                        col + ". Schema " +  inputSchema.toString(false) + 
+                        "Out of bound access. Trying to access non-existent column: " +
+                        col + ". Schema " +  inputSchema.toString(false) +
                         " has " + inputSchema.size() + " column(s)." , 1000);
             }
             return col;
         }
     }
 
-    
+
     /**
      * @link org.apache.pig.newplan.Operator#accept(org.apache.pig.newplan.PlanVisitor)
      */
@@ -217,12 +217,12 @@ public class ProjectExpression extends C
     public int getInputNum() {
         return input;
     }
-    
-   
+
+
     public void setInputNum(int inputNum) {
         input = inputNum;
     }
-    
+
     /**
      * Column number this project references.  The column number is the column
      * in the relational operator that contains this expression.  The count
@@ -235,21 +235,21 @@ public class ProjectExpression extends C
         }
         return col;
     }
-    
+
     public String getColAlias() {
         return alias;
     }
-    
+
     /**
      * Set the column number for this project.  This should only be called by
-     * ProjectionPatcher.  Stupid Java needs friends.  
+     * ProjectionPatcher.  Stupid Java needs friends.
      * @param colNum new column number for projection
      */
     public void setColNum(int colNum) {
         col = colNum;
         alias = null; // Once the column number is set, alias is no longer needed.
     }
-    
+
     public boolean isProjectStar() {
         return col<0;
     }
@@ -257,19 +257,19 @@ public class ProjectExpression extends C
     public boolean isRangeProject() {
         return isRangeProject;
     }
-    
+
     public boolean isRangeOrStarProject(){
         return isProjectStar() || isRangeProject();
     }
-    
+
     @Override
     public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendException {
         if (fieldSchema!=null)
             return fieldSchema;
         LogicalRelationalOperator referent = findReferent();
-        
+
         LogicalSchema schema = referent.getSchema();
-        
+
         if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
             if (!(findReferent() instanceof LOInnerLoad)||
                     ((LOInnerLoad)findReferent()).sourceIsBag()) {
@@ -278,7 +278,7 @@ public class ProjectExpression extends C
                 Pair<List<LOInnerLoad>, Boolean> innerLoadsPair = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
                 List<LOInnerLoad> innerLoads = innerLoadsPair.first;
                 boolean needNewUid = innerLoadsPair.second;
-                
+
                 // pull tuple information from innerload
                 if (innerLoads.get(0).getProjection().getFieldSchema().schema!=null &&
                         innerLoads.get(0).getProjection().getFieldSchema().type==DataType.BAG) {
@@ -342,10 +342,10 @@ public class ProjectExpression extends C
                 } else {
                     fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
                 }
-                
+
                 if (fieldSchema!=null)
                     uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
-            } 
+            }
             else {
                 int index = -1;
                 if (!isRangeOrStarProject() && uidOnlyFieldSchema!=null) {
@@ -364,7 +364,7 @@ public class ProjectExpression extends C
                 }
                 if (index==-1)
                     index = col;
-                
+
                 if (!isRangeOrStarProject()) {
                     if (schema!=null && schema.size()>index)
                         fieldSchema = schema.getField(index);
@@ -403,22 +403,22 @@ public class ProjectExpression extends C
         if (preds == null || input >= preds.size()) {
             throw new FrontendException("Projection with nothing to reference!", 2225);
         }
-        
+
         LogicalRelationalOperator pred =
             (LogicalRelationalOperator)preds.get(input);
         if (pred == null) {
-            throw new FrontendException("Cannot fine reference for " + this, 2226);
+            throw new FrontendException("Cannot find reference for " + this, 2226);
         }
         return pred;
     }
-    
+
     @Override
     public boolean isEqual(Operator other) throws FrontendException {
         if (other != null && other instanceof ProjectExpression) {
             ProjectExpression po = (ProjectExpression)other;
             if (po.input != input || po.col != col)
                 return false;
-            
+
             Operator mySucc = getPlan().getSuccessors(this)!=null?
                     getPlan().getSuccessors(this).get(0):null;
             Operator theirSucc = other.getPlan().getSuccessors(other)!=null?
@@ -432,7 +432,7 @@ public class ProjectExpression extends C
             return false;
         }
     }
-    
+
     public String toString() {
         StringBuilder msg = new StringBuilder();
         if (fieldSchema!=null && fieldSchema.alias!=null)
@@ -460,18 +460,18 @@ public class ProjectExpression extends C
 
         return msg.toString();
     }
-    
+
     public LogicalRelationalOperator getAttachedRelationalOp() {
         return attachedRelationalOp;
     }
-    
+
     public void setAttachedRelationalOp(LogicalRelationalOperator attachedRelationalOp) {
         this.attachedRelationalOp = attachedRelationalOp;
     }
-    
+
     @Override
     public byte getType() throws FrontendException {
-        // for boundary project, if 
+        // for boundary project, if
         if (getFieldSchema()==null) {
             if (attachedRelationalOp instanceof LOGenerate && findReferent() instanceof
                     LOInnerLoad) {
@@ -513,7 +513,7 @@ public class ProjectExpression extends C
 
     /**
      * @param startAlias
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public void setStartAlias(String startAlias) throws FrontendException {
        this.startAlias = startAlias;
@@ -521,7 +521,7 @@ public class ProjectExpression extends C
 
     /**
      * @param endAlias
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public void setEndAlias(String endAlias) throws FrontendException {
         this.endAlias = endAlias;
@@ -535,13 +535,13 @@ public class ProjectExpression extends C
                 this.getColNum(),
                 this.getAttachedRelationalOp());
         copy.setLocation( new SourceLocation( location ) );
-        copy.alias = alias; 
+        copy.alias = alias;
         copy.isRangeProject = this.isRangeProject;
         copy.startCol = this.startCol;
         copy.endCol = this.endCol;
         copy.startAlias = this.startAlias;
         copy.endAlias = this.endAlias;
-        
+
         return copy;
     }
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Nov 14 23:38:53 2012
@@ -107,13 +107,13 @@ public class LogicalPlanBuilder {
     private IntStream intStream;
     private int storeIndex = 0;
     private int loadIndex = 0;
-    
+
     private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
-    
+
     public static long getNextId(String scope) {
         return nodeIdGen.getNextNodeId( scope );
     }
-    
+
     LogicalPlanBuilder(PigContext pigContext, String scope, Map<String, String> fileNameMap,
             IntStream input) {
         this.pigContext = pigContext;
@@ -121,7 +121,7 @@ public class LogicalPlanBuilder {
         this.fileNameMap = fileNameMap;
         this.intStream = input;
     }
-    
+
     LogicalPlanBuilder(IntStream input) throws ExecException {
         pigContext = new PigContext( ExecType.LOCAL, new Properties() );
         pigContext.connect();
@@ -129,7 +129,7 @@ public class LogicalPlanBuilder {
         this.fileNameMap = new HashMap<String, String>();
         this.intStream = input;
     }
-    
+
     Operator lookupOperator(String alias) {
         return operators.get( alias );
     }
@@ -137,27 +137,27 @@ public class LogicalPlanBuilder {
     FuncSpec lookupFunction(String alias) {
         return pigContext.getFuncSpecFromAlias( alias );
     }
-    
+
     StreamingCommand lookupCommand(String alias) {
         return pigContext.getCommandForAlias( alias );
     }
-    
+
     void defineCommand(String alias, StreamingCommand command) {
         pigContext.registerStreamCmd( alias, command );
     }
-    
+
     void defineFunction(String alias, FuncSpec fs) {
         pigContext.registerFunction( alias, fs );
     }
-    
+
     LogicalPlan getPlan() {
         return plan;
     }
-    
+
     Map<String, Operator> getOperators() {
         return operators;
     }
-    
+
     LOFilter createFilterOp() {
         return new LOFilter( plan );
     }
@@ -165,26 +165,26 @@ public class LogicalPlanBuilder {
     LOLimit createLimitOp() {
         return new LOLimit( plan );
     }
-    
+
     LOFilter createSampleOp() {
         return new LOFilter( plan, true );
     }
-    
-    String buildFilterOp(SourceLocation loc, LOFilter op, String alias, 
+
+    String buildFilterOp(SourceLocation loc, LOFilter op, String alias,
             String inputAlias, LogicalExpressionPlan expr)
                     throws ParserValidationException {
-        
+
         op.setFilterPlan( expr );
-        alias = buildOp( loc, op, alias, inputAlias, null ); // it should actually return same alias 
+        alias = buildOp( loc, op, alias, inputAlias, null ); // it should actually return same alias
         try {
             (new ProjStarInUdfExpander(op.getPlan())).visit(op);
             new SchemaResetter(op.getPlan(), true).visit(op);
         } catch (FrontendException e) {
             throw new ParserValidationException( intStream, loc, e );
-        }   
+        }
         return alias;
     }
-    
+
     String buildDistinctOp(SourceLocation loc, String alias, String inputAlias, String partitioner) throws ParserValidationException {
         LODistinct op = new LODistinct( plan );
         return buildOp( loc, op, alias, inputAlias, partitioner );
@@ -194,16 +194,16 @@ public class LogicalPlanBuilder {
         LOLimit op = new LOLimit( plan, limit );
         return buildOp( loc, op, alias, inputAlias, null );
     }
-    
+
     String buildLimitOp(SourceLocation loc, LOLimit op, String alias, String inputAlias, LogicalExpressionPlan expr) throws ParserValidationException {
         op.setLimitPlan(expr);
         return buildOp(loc, op, alias, inputAlias, null);
     }
-    
+
     String buildSampleOp(SourceLocation loc, String alias, String inputAlias, double value,
             SourceLocation valLoc)
                     throws ParserValidationException {
-        
+
         LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
         //  Generate a filter condition.
         LogicalExpression konst = new ConstantExpression( filterPlan, value);
@@ -213,16 +213,16 @@ public class LogicalPlanBuilder {
         LOFilter filter = new LOFilter( plan, true );
         return buildFilterOp( loc, filter, alias, inputAlias, filterPlan );
     }
-    
+
     String buildSampleOp(SourceLocation loc, LOFilter filter, String alias, String inputAlias,
             LogicalExpressionPlan samplePlan, LogicalExpression expr)
                     throws ParserValidationException {
-        
+
         UserFuncExpression udf = new UserFuncExpression( samplePlan, new FuncSpec( RANDOM.class.getName() ) );
         new LessThanExpression( samplePlan, udf, expr );
         return buildFilterOp( loc, filter, alias, inputAlias, samplePlan );
     }
-    
+
     String buildUnionOp(SourceLocation loc, String alias, List<String> inputAliases, boolean onSchema) throws ParserValidationException {
         LOUnion op = new LOUnion( plan, onSchema );
         return buildOp( loc, op, alias, inputAliases, null );
@@ -232,17 +232,17 @@ public class LogicalPlanBuilder {
         LOSplit op = new LOSplit( plan );
         return buildOp( loc, op, null, inputAlias, null );
     }
-    
+
     LOSplitOutput createSplitOutputOp() {
         return  new LOSplitOutput( plan );
     }
-    
+
     String buildSplitOutputOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias,
             LogicalExpressionPlan filterPlan) throws ParserValidationException {
         op.setFilterPlan( filterPlan );
         return buildOp ( loc, op, alias, inputAlias, null );
     }
-    
+
     String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias)
             throws ParserValidationException, PlanGenerationFailureException {
         LogicalExpressionPlan splitPlan = new LogicalExpressionPlan();
@@ -284,17 +284,17 @@ public class LogicalPlanBuilder {
         op.setFilterPlan(splitPlan);
         return buildOp(loc, op, alias, inputAlias, null);
     }
-    
+
     String buildCrossOp(SourceLocation loc, String alias, List<String> inputAliases, String partitioner) throws ParserValidationException {
         LOCross op = new LOCross( plan );
         return buildOp ( loc, op, alias, inputAliases, partitioner );
     }
-    
+
     LOSort createSortOp() {
         return new LOSort( plan );
     }
-    
-    String buildSortOp(SourceLocation loc, LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans, 
+
+    String buildSortOp(SourceLocation loc, LOSort sort, String alias, String inputAlias, List<LogicalExpressionPlan> plans,
             List<Boolean> ascFlags, FuncSpec fs) throws ParserValidationException {
         sort.setSortColPlans( plans );
         sort.setUserFunc( fs );
@@ -342,15 +342,15 @@ public class LogicalPlanBuilder {
         else {
             op.pinOption(LOJoin.OPTION_JOIN);
         }
-        
+
         int inputCount = inputAliases.size();
-        
+
         if( jt == JOINTYPE.SKEWED ) {
             if( partitioner != null ) {
                 throw new ParserValidationException( intStream, loc,
                         "Custom Partitioner is not supported for skewed join" );
             }
-            
+
             if( inputCount != 2 ) {
                 throw new ParserValidationException( intStream, loc,
                         "Skewed join can only be applied for 2-way joins" );
@@ -394,7 +394,7 @@ public class LogicalPlanBuilder {
 	    throw new ParserValidationException(intStream, loc, e);
 	}
     }
-    
+
     LOCube createCubeOp() {
 	return new LOCube(plan);
     }
@@ -772,20 +772,20 @@ public class LogicalPlanBuilder {
 	}
 
     }
-	
+
     LOCogroup createGroupOp() {
         return new LOCogroup( plan );
     }
-    
-    String buildGroupOp(SourceLocation loc, LOCogroup op, String alias, List<String> inputAliases, 
+
+    String buildGroupOp(SourceLocation loc, LOCogroup op, String alias, List<String> inputAliases,
         MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags,
         String partitioner) throws ParserValidationException {
         if( gt == GROUPTYPE.COLLECTED ) {
             if( inputAliases.size() > 1 ) {
-                throw new ParserValidationException( intStream, loc, 
+                throw new ParserValidationException( intStream, loc,
                         "Collected group is only supported for single input" );
             }
-            
+
             List<LogicalExpressionPlan> exprPlans = expressionPlans.get( 0 );
             for( LogicalExpressionPlan exprPlan : exprPlans ) {
                 Iterator<Operator> it = exprPlan.getOperators();
@@ -797,7 +797,7 @@ public class LogicalPlanBuilder {
                 }
             }
         }
-        
+
         boolean[] flags = new boolean[innerFlags.size()];
         for( int i = 0; i < innerFlags.size(); i++ ) {
             flags[i] = innerFlags.get( i );
@@ -846,15 +846,15 @@ public class LogicalPlanBuilder {
         return buildOp( loc, op, alias, new ArrayList<String>(), null );
     }
 
-    private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias, 
+    private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
     		String inputAlias, String partitioner) throws ParserValidationException {
         List<String> inputAliases = new ArrayList<String>();
         if( inputAlias != null )
             inputAliases.add( inputAlias );
         return buildOp( loc, op, alias, inputAliases, partitioner );
     }
-    
-    private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias, 
+
+    private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
     		List<String> inputAliases, String partitioner) throws ParserValidationException {
         setAlias( op, alias );
         setPartitioner( op, partitioner );
@@ -868,7 +868,7 @@ public class LogicalPlanBuilder {
             plan.connect( pred, op );
         }
         operators.put( op.getAlias(), op );
-        pigContext.setLastAlias(op.getAlias());	
+        pigContext.setLastAlias(op.getAlias());
         return op.getAlias();
     }
 
@@ -908,15 +908,15 @@ public class LogicalPlanBuilder {
     private String newOperatorKey() {
         return new OperatorKey( scope, getNextId() ).toString();
     }
-    
+
     public static String newOperatorKey(String scope) {
         return new OperatorKey( scope, getNextId(scope)).toString();
     }
-    
+
     LOForEach createForeachOp() {
         return new LOForEach( plan );
     }
-    
+
     String buildForeachOp(SourceLocation loc, LOForEach op, String alias, String inputAlias, LogicalPlan innerPlan)
     throws ParserValidationException {
         op.setInnerPlan( innerPlan );
@@ -924,30 +924,43 @@ public class LogicalPlanBuilder {
         expandAndResetVisitor(loc, op);
         return alias;
     }
-    
+
     LOGenerate createGenerateOp(LogicalPlan plan) {
         return new LOGenerate( plan );
     }
-    
+
     void buildGenerateOp(SourceLocation loc, LOForEach foreach, LOGenerate gen,
             Map<String, Operator> operators,
             List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
             List<LogicalSchema> schemas)
-    throws ParserValidationException{
-        
+    throws ParserValidationException {
+
         boolean[] flags = new boolean[ flattenFlags.size() ];
         for( int i = 0; i < flattenFlags.size(); i++ )
             flags[i] = flattenFlags.get( i );
         LogicalPlan innerPlan = (LogicalPlan)gen.getPlan();
         ArrayList<Operator> inputs = new ArrayList<Operator>();
+        int idx = 0;
         for( LogicalExpressionPlan exprPlan : exprPlans ) {
+            LogicalExpression expr = (LogicalExpression)exprPlan.getSources().get(0);
+            LogicalSchema userSchema = schemas.get(idx);
+            if (userSchema == null && expr.hasFieldSchema()) {
+                LogicalSchema ls = new LogicalSchema();
+                try {
+                    ls.addField(expr.getFieldSchema());
+                    schemas.set(idx, ls);
+                } catch (FrontendException e) {
+                    // if we get an exception, then we have no schema to set
+                }
+            }
+            idx++;
             try {
                 processExpressionPlan( foreach, innerPlan, exprPlan, operators, inputs );
             } catch (FrontendException e) {
                 throw new ParserValidationException(intStream, loc, e);
             }
         }
-        
+
         gen.setOutputPlans( exprPlans );
         gen.setFlattenFlags( flags );
         gen.setUserDefinedSchema( schemas );
@@ -957,21 +970,21 @@ public class LogicalPlanBuilder {
             innerPlan.connect( input, gen );
         }
     }
-    
+
     /**
      * Process expression plans of LOGenerate and set inputs relation
      * for the ProjectExpression
-     * @param foreach 
+     * @param foreach
      * @param lp Logical plan in which the LOGenerate is in
      * @param plan One of the output expression of the LOGenerate
      * @param operators All logical operators in lp;
      * @param inputs  inputs of the LOGenerate
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     private static void processExpressionPlan(LOForEach foreach,
-                                      LogicalPlan lp,  
-                                      LogicalExpressionPlan plan,  
-                                      Map<String, Operator> operators,  
+                                      LogicalPlan lp,
+                                      LogicalExpressionPlan plan,
+                                      Map<String, Operator> operators,
                                       ArrayList<Operator> inputs ) throws FrontendException {
         Iterator<Operator> it = plan.getOperators();
         while( it.hasNext() ) {
@@ -981,7 +994,7 @@ public class LogicalPlanBuilder {
                 ProjectExpression projExpr = (ProjectExpression)sink;
                 String colAlias = projExpr.getColAlias();
                 if( projExpr.isRangeProject()){
-                 
+
                     LOInnerLoad innerLoad = new LOInnerLoad( lp, foreach,
                             new ProjectExpression(projExpr, new LogicalExpressionPlan())
                     );
@@ -994,7 +1007,7 @@ public class LogicalPlanBuilder {
                         // this means the project expression refers to a relation
                         // in the nested foreach
 
-                        //add the relation to inputs of LOGenerate and set 
+                        //add the relation to inputs of LOGenerate and set
                         // projection input
                         int index = inputs.indexOf( op );
                         if( index == -1 ) {
@@ -1019,7 +1032,7 @@ public class LogicalPlanBuilder {
             }
         }
     }
-    
+
     private static void setupInnerLoadAndProj(LOInnerLoad innerLoad,
             ProjectExpression projExpr, LogicalPlan lp,
             ArrayList<Operator> inputs) {
@@ -1028,10 +1041,10 @@ public class LogicalPlanBuilder {
         projExpr.setColNum( -1 ); // Projection Expression on InnerLoad is always (*).
         lp.add( innerLoad );
         inputs.add( innerLoad );
-        
+
     }
 
-    Operator buildNestedOperatorInput(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, 
+    Operator buildNestedOperatorInput(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
             Map<String, Operator> operators, LogicalExpression expr)
     throws NonProjectExpressionException, ParserValidationException {
         OperatorPlan plan = expr.getPlan();
@@ -1056,7 +1069,7 @@ public class LogicalPlanBuilder {
         }
         return op;
     }
-    
+
     private LOInnerLoad createInnerLoad(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
             String colAlias) throws ParserValidationException {
         try {
@@ -1072,7 +1085,7 @@ public class LogicalPlanBuilder {
         StreamingCommand command = null;
         try {
             command = buildCommand( loc, cmd );
-            
+
             // Process ship paths
             if( shipPaths != null ) {
                 if( shipPaths.size() == 0 ) {
@@ -1082,25 +1095,25 @@ public class LogicalPlanBuilder {
                         command.addPathToShip( path );
                 }
             }
-            
+
             // Process cache paths
             if( cachePaths != null ) {
                 for( String path : cachePaths )
                     command.addPathToCache( path );
             }
-            
+
             // Process input handle specs
             if( inputHandleSpecs != null ) {
                 for( HandleSpec spec : inputHandleSpecs )
                     command.addHandleSpec( Handle.INPUT, spec );
             }
-            
+
             // Process output handle specs
             if( outputHandleSpecs != null ) {
                 for( HandleSpec spec : outputHandleSpecs )
                     command.addHandleSpec( Handle.OUTPUT, spec );
             }
-            
+
             // error handling
             if( logDir != null )
                 command.setLogDir( logDir );
@@ -1109,10 +1122,10 @@ public class LogicalPlanBuilder {
         } catch(IOException e) {
             throw new PlanGenerationFailureException( intStream, loc, e );
         }
-        
+
         return command;
     }
-    
+
     StreamingCommand buildCommand(SourceLocation loc, String cmd) throws RecognitionException {
         try {
             String[] args = StreamingCommandUtils.splitArgs( cmd );
@@ -1124,7 +1137,7 @@ public class LogicalPlanBuilder {
             throw new InvalidCommandException( intStream, loc, cmd );
         }
     }
-    
+
     String buildStreamOp(SourceLocation loc, String alias, String inputAlias, StreamingCommand command,
             LogicalSchema schema, IntStream input)
     throws RecognitionException {
@@ -1135,7 +1148,7 @@ public class LogicalPlanBuilder {
             throw new PlanGenerationFailureException( input, loc, ex );
         }
     }
-    
+
     String buildNativeOp(SourceLocation loc, String inputJar, String cmd,
             List<String> paths, String storeAlias, String loadAlias, IntStream input)
     throws RecognitionException {
@@ -1157,31 +1170,31 @@ public class LogicalPlanBuilder {
             throw new InvalidPathException( input, loc, e);
         }
     }
-    
+
     void setAlias(LogicalRelationalOperator op, String alias) {
         if( alias == null )
             alias = newOperatorKey();
         op.setAlias( alias );
     }
-    
+
     void setParallel(LogicalRelationalOperator op, Integer parallel) {
         if( parallel != null ) {
             op.setRequestedParallelism( pigContext.getExecType() == ExecType.LOCAL ? 1 : parallel );
         }
     }
-    
+
     static void setPartitioner(LogicalRelationalOperator op, String partitioner) {
         if( partitioner != null )
             op.setCustomPartitioner( partitioner );
     }
-    
+
     FuncSpec buildFuncSpec(SourceLocation loc, String funcName, List<String> args, byte ft) throws RecognitionException {
         String[] argArray = new String[args.size()];
         FuncSpec funcSpec = new FuncSpec( funcName, args.size() == 0 ? null : args.toArray( argArray ) );
         validateFuncSpec( loc, funcSpec, ft );
         return funcSpec;
     }
-    
+
     private void validateFuncSpec(SourceLocation loc, FuncSpec funcSpec, byte ft) throws RecognitionException {
         switch (ft) {
         case FunctionType.COMPARISONFUNC:
@@ -1197,15 +1210,15 @@ public class LogicalPlanBuilder {
             }
         }
     }
-    
+
     static String unquote(String s) {
         return StringUtils.unescapeInputString( s.substring(1, s.length() - 1 ) );
     }
-    
+
     static int undollar(String s) {
-        return Integer.parseInt( s.substring( 1, s.length() ) );    
+        return Integer.parseInt( s.substring( 1, s.length() ) );
     }
-    
+
     /**
      * Parse the long given as a string such as "34L".
      */
@@ -1218,23 +1231,23 @@ public class LogicalPlanBuilder {
         TupleFactory tf = TupleFactory.getInstance();
         return tf.newTuple( objList );
     }
-    
+
     static DataBag createDataBag() {
         BagFactory bagFactory = BagFactory.getInstance();
         return bagFactory.newDefaultBag();
     }
-    
+
     /**
      *  Build a project expression in foreach inner plan.
      *  The only difference here is that the projection can be for an expression alias, for which
      *  we will return whatever the expression alias represents.
-     * @throws RecognitionException 
+     * @throws RecognitionException
      */
     LogicalExpression buildProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator op,
             Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
     throws RecognitionException {
         ProjectExpression result = null;
-        
+
         if( colAlias != null ) {
             LogicalExpressionPlan exprPlan = exprPlans.get( colAlias );
             if( exprPlan != null ) {
@@ -1246,7 +1259,7 @@ public class LogicalPlanBuilder {
                     throw new PlanGenerationFailureException( intStream, loc, ex );
                 }
                 // The projected alias is actually expression alias, so the projections in the represented
-                // expression doesn't have any operator associated with it. We need to set it when we 
+                // expression doesn't have any operator associated with it. We need to set it when we
                 // substitute the expression alias with the its expression.
                 if( op != null ) {
                     Iterator<Operator> it = plan.getOperators();
@@ -1258,7 +1271,17 @@ public class LogicalPlanBuilder {
                         }
                     }
                 }
-                return (LogicalExpression)planCopy.getSources().get( 0 );// get the root of the plan
+                LogicalExpression root = (LogicalExpression)planCopy.getSources().get( 0 );// get the root of the plan
+                LogicalFieldSchema schema;
+                try {
+                    schema = root.getFieldSchema();
+                    if (schema.alias == null) {
+                        schema.alias = colAlias;
+                    }
+                } catch (FrontendException e) {
+                    // Sometimes it can throw an exception. If it does, then there is no schema to get
+                }
+                return root;
             } else {
                 result = new ProjectExpression( plan, 0, colAlias, op );
                 result.setLocation( loc );
@@ -1272,9 +1295,9 @@ public class LogicalPlanBuilder {
 
     /**
      * Build a project expression for a projection present in global plan (not in nested foreach plan).
-     * @throws ParserValidationException 
+     * @throws ParserValidationException
      */
-    LogicalExpression buildProjectExpr(SourceLocation loc, 
+    LogicalExpression buildProjectExpr(SourceLocation loc,
             LogicalExpressionPlan plan, LogicalRelationalOperator relOp,
             int input, String colAlias, int col)
     throws ParserValidationException {
@@ -1288,28 +1311,28 @@ public class LogicalPlanBuilder {
 
     /**
      * Build a project expression that projects a range of columns
-     * @param loc 
+     * @param loc
      * @param plan
      * @param relOp
      * @param input
-     * @param startExpr the first expression to be projected, null 
+     * @param startExpr the first expression to be projected, null
      *        if everything from first is to be projected
-     * @param endExpr the last expression to be projected, null 
+     * @param endExpr the last expression to be projected, null
      *        if everything to the end is to be projected
      * @return project expression
-     * @throws ParserValidationException 
+     * @throws ParserValidationException
      */
     LogicalExpression buildRangeProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator relOp,
             int input, LogicalExpression startExpr, LogicalExpression endExpr)
     throws ParserValidationException {
-        
+
         if(startExpr == null && endExpr == null){
             // should not reach here as the parser is enforcing this condition
             String msg = "in range project (..) at least one of start or end " +
             "has to be specified. Use project-star (*) instead.";
             throw new ParserValidationException(intStream, loc, msg);
         }
-        
+
         ProjectExpression proj = new ProjectExpression(plan, input, relOp);
 
         //set first column to be projected
@@ -1328,7 +1351,7 @@ public class LogicalPlanBuilder {
         }else{
             proj.setStartCol(0);//project from first column
         }
-        
+
         //set last column to be projected
         if(endExpr != null){
             checkRangeProjectExpr(loc, endExpr);
@@ -1345,7 +1368,7 @@ public class LogicalPlanBuilder {
         }else{
             proj.setEndCol(-1); //project to last column
         }
-        
+
         try {
             if(startExpr != null)
                 plan.removeAndReconnect(startExpr);
@@ -1354,8 +1377,8 @@ public class LogicalPlanBuilder {
         } catch (FrontendException e) {
             throw new ParserValidationException(intStream, loc, e);
         }
-        
-        
+
+
         return proj;
     }
 
@@ -1367,7 +1390,7 @@ public class LogicalPlanBuilder {
             " Found :" + startExpr;
             throw new ParserValidationException(intStream, loc, msg);
         }
-        
+
     }
 
     LogicalExpression buildUDF(SourceLocation loc, LogicalExpressionPlan plan,
@@ -1381,7 +1404,7 @@ public class LogicalPlanBuilder {
         } catch (Exception e) {
             throw new PlanGenerationFailureException(intStream, loc, e);
         }
-        
+
         FuncSpec funcSpec = pigContext.getFuncSpecFromAlias(funcName);
         LogicalExpression le;
         if( funcSpec == null ) {
@@ -1393,11 +1416,11 @@ public class LogicalPlanBuilder {
         } else {
             le = new UserFuncExpression(plan, funcSpec, args, true);
         }
-        
+
         le.setLocation(loc);
         return le;
     }
-    
+
     private long getNextId() {
         return getNextId(scope);
     }
@@ -1405,13 +1428,13 @@ public class LogicalPlanBuilder {
     static LOFilter createNestedFilterOp(LogicalPlan plan) {
         return new LOFilter( plan );
     }
-    
+
     static LOLimit createNestedLimitOp(LogicalPlan plan) {
         return new LOLimit ( plan );
     }
-    
+
     // Build operator for foreach inner plan.
-    Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias, 
+    Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias,
             Operator inputOp, LogicalExpressionPlan expr) {
         op.setFilterPlan( expr );
         buildNestedOp( loc, plan, op, alias, inputOp );
@@ -1429,21 +1452,21 @@ public class LogicalPlanBuilder {
         buildNestedOp( loc, plan, op, alias, inputOp );
         return op;
     }
-    
-    Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias, 
+
+    Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias,
             Operator inputOp, LogicalExpressionPlan expr) {
         op.setLimitPlan( expr );
         buildNestedOp( loc, plan, op, alias, inputOp );
         return op;
     }
-    
+
     Operator buildNestedCrossOp(SourceLocation loc, LogicalPlan plan, String alias, List<Operator> inputOpList) {
         LOCross op = new LOCross( plan );
         op.setNested(true);
         buildNestedOp( loc, plan, op, alias, inputOpList );
         return op;
     }
-    
+
     private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op,
             String alias, Operator inputOp) {
         op.setLocation( loc );
@@ -1451,7 +1474,7 @@ public class LogicalPlanBuilder {
         plan.add( op );
         plan.connect( inputOp, op );
     }
-    
+
     private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op,
             String alias, List<Operator> inputOpList) {
         op.setLocation( loc );
@@ -1465,7 +1488,7 @@ public class LogicalPlanBuilder {
     static LOSort createNestedSortOp(LogicalPlan plan) {
         return new LOSort( plan );
     }
-    
+
     /**
      * For any UNKNOWN type in the schema fields, set the type to BYTEARRAY
      * @param sch
@@ -1480,13 +1503,13 @@ public class LogicalPlanBuilder {
             }
         }
     }
-    
+
     static LOForEach createNestedForeachOp(LogicalPlan plan) {
     	return new LOForEach(plan);
     }
-    
+
     Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
-            List<LogicalExpressionPlan> plans, 
+            List<LogicalExpressionPlan> plans,
             List<Boolean> ascFlags, FuncSpec fs) {
         op.setSortColPlans( plans );
         if (ascFlags.isEmpty()) {
@@ -1498,8 +1521,8 @@ public class LogicalPlanBuilder {
         buildNestedOp( loc, plan, op, alias, inputOp );
         return op;
     }
-    
-    Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias, 
+
+    Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
     		Operator inputOp, LogicalPlan innerPlan)
     throws ParserValidationException
     {
@@ -1507,8 +1530,8 @@ public class LogicalPlanBuilder {
     	buildNestedOp(loc, plan, op, alias, inputOp);
     	return op;
     }
-    
-    Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach, 
+
+    Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
             Map<String, Operator> operators,
             String alias,
             ProjectExpression projExpr,
@@ -1532,19 +1555,19 @@ public class LogicalPlanBuilder {
             input = new LOInnerLoad( innerPlan, foreach, projExpr.getColNum() );
             input.setLocation( projExpr.getLocation() );
         }
-        
+
         LogicalPlan lp = new LogicalPlan(); // f's inner plan
         LOForEach f = new LOForEach( innerPlan );
         f.setInnerPlan( lp );
         f.setLocation( loc );
         LOGenerate gen = new LOGenerate( lp );
         boolean[] flatten = new boolean[exprPlans.size()];
-        
+
         List<Operator> innerLoads = new ArrayList<Operator>( exprPlans.size() );
         for( LogicalExpressionPlan plan : exprPlans ) {
             ProjectExpression pe = (ProjectExpression)plan.getSinks().get( 0 );
             String al = pe.getColAlias();
-            LOInnerLoad iload = ( al == null ) ?  
+            LOInnerLoad iload = ( al == null ) ?
                     new LOInnerLoad( lp, f, pe.getColNum() ) : createInnerLoad(loc, lp, f, al );
             iload.setLocation( pe.getLocation() );
             pe.setColNum( -1 );
@@ -1552,7 +1575,7 @@ public class LogicalPlanBuilder {
             pe.setAttachedRelationalOp( gen );
             innerLoads.add( iload );
         }
-        
+
         gen.setOutputPlans( exprPlans );
         gen.setFlattenFlags( flatten );
         lp.add( gen );
@@ -1561,7 +1584,7 @@ public class LogicalPlanBuilder {
             lp.add( il );
             lp.connect( il, gen );
         }
-        
+
         // Connect the inner load operators to gen
         setAlias( f, alias );
         innerPlan.add( input );
@@ -1569,10 +1592,10 @@ public class LogicalPlanBuilder {
         innerPlan.connect( input, f );
         return f;
     }
-    
+
     GROUPTYPE parseGroupType(String hint, SourceLocation loc) throws ParserValidationException {
         String modifier = unquote( hint );
-        
+
         if( modifier.equalsIgnoreCase( "collected" ) ) {
             return GROUPTYPE.COLLECTED;
         } else if( modifier.equalsIgnoreCase( "regular" ) ){
@@ -1584,12 +1607,12 @@ public class LogicalPlanBuilder {
                 "Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers." );
         }
     }
-    
+
     JOINTYPE parseJoinType(String hint, SourceLocation loc) throws ParserValidationException {
         String modifier = unquote( hint );
 
         if( modifier.equalsIgnoreCase( "repl" ) || modifier.equalsIgnoreCase( "replicated" ) ) {
-                  return JOINTYPE.REPLICATED; 
+                  return JOINTYPE.REPLICATED;
           } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
                   return LOJoin.JOINTYPE.HASH;
           } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
@@ -1603,7 +1626,7 @@ public class LogicalPlanBuilder {
                       "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
           }
     }
-    
+
     void putOperator(String alias, Operator op) {
         operators.put(alias, op);
     }

Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1409562&r1=1409561&r2=1409562&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Wed Nov 14 23:38:53 2012
@@ -18,24 +18,44 @@
 
 package org.apache.pig.parser;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
-
-import junit.framework.Assert;
+import java.util.Iterator;
+import java.util.List;
 
 import org.antlr.runtime.MismatchedTokenException;
-import org.antlr.runtime.NoViableAltException;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.Util;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestLogicalPlanGenerator {
     static File command;
-    
+
+    private PigServer pigServer;
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL);
+    }
+
     @BeforeClass
     public static void oneTimeSetup() throws IOException, Exception {
-        // Perl script 
-        String[] script = 
+        // Perl script
+        String[] script =
             new String[] {
                           "#!/usr/bin/perl",
                           "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
@@ -47,10 +67,10 @@ public class TestLogicalPlanGenerator {
                          };
         command = Util.createInputFile("script", "pl", script);
     }
-    
+
     @Test
-    public void test1() {
-        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+    public void test1() throws Exception {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
                        "B = limit A 100; " +
                        "C = filter B by 2 > 1; " +
                        "D = load 'y' as (d1, d2); " +
@@ -61,8 +81,8 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test2() {
-        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+    public void test2() throws Exception {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
                        "B = distinct A partition by org.apache.pig.Identity; " +
                        "C = sample B 0.49; " +
                        "D = order C by $0, $1; " +
@@ -76,25 +96,21 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test3() {
-        String query = "a = load '1.txt'  as (name, age, gpa);" + 
+    public void test3() throws Exception {
+        String query = "a = load '1.txt'  as (name, age, gpa);" +
                        "b = group a by name PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2;" +
                        "c = foreach b generate group, COUNT(a.age);" +
                        "store c into 'y';";
         generateLogicalPlan( query );
     }
-    
-    private void generateLogicalPlan(String query) {
-        try {
-            ParserTestingUtils.generateLogicalPlan( query );
-        } catch(Exception ex) {
-            Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
-        }
+
+    private void generateLogicalPlan(String query) throws Exception {
+        ParserTestingUtils.generateLogicalPlan( query );
     }
 
     @Test
-    public void test4() {
-        String query = "A = load 'x'; " + 
+    public void test4() throws Exception {
+        String query = "A = load 'x'; " +
                        "B = mapreduce '" + "myjar.jar" + "' " +
                            "Store A into 'table_testNativeMRJobSimple_input' "+
                            "Load 'table_testNativeMRJobSimple_output' "+
@@ -107,7 +123,7 @@ public class TestLogicalPlanGenerator {
 
     // Test define function.
     @Test
-    public void test5() {
+    public void test5() throws Exception {
         String query = "define myudf org.apache.pig.builtin.PigStorage( ',' );" +
                        "A = load 'x' using myudf;" +
                        "store A into 'y';";
@@ -115,7 +131,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test6() {
+    public void test6() throws Exception {
         String query = "A = load 'x' as ( a : int, b, c : chararray );" +
                        "B = group A by ( a, $2 );" +
                        "store B into 'y';";
@@ -123,7 +139,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test7() {
+    public void test7() throws Exception {
         String query = "A = load 'x' as ( a : int, b, c : chararray );" +
                        "B = foreach A generate a, $2;" +
                        "store B into 'y';";
@@ -131,16 +147,16 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test8() {
+    public void test8() throws Exception {
         String query = "A = load 'x' as ( a : int, b, c : chararray );" +
                        "B = group A by a;" +
                        "C = foreach B { S = A.b; generate S; };" +
                        "store C into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test9() {
+    public void test9() throws Exception {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
                        "B = foreach A { R = a; S = R.u; T = limit S 100; generate S, T, c + d/5; };" +
                        "store B into 'y';";
@@ -148,7 +164,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test10() {
+    public void test10() throws Exception {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
                        "B = foreach A { S = a; T = limit S 100; generate T; };" +
                        "store B into 'y';";
@@ -156,7 +172,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test11() {
+    public void test11() throws Exception {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
                        "B = foreach A { T = limit a 100; generate T; };" +
                        "store B into 'y';";
@@ -164,7 +180,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test12() {
+    public void test12() throws Exception {
         String query = "define CMD `perl GroupBy.pl '\t' 0 1` ship('"+Util.encodeEscape(command.toString())+"');" +
                        "A = load 'x';" +
                        "B = group A by $0;" +
@@ -176,18 +192,18 @@ public class TestLogicalPlanGenerator {
                        "store E into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test13() {
+    public void test13() throws Exception {
         String query = "define CMD `perl PigStreaming.pl` ship('"+Util.encodeEscape(command.toString())+"') stderr('CMD');" +
                        "A = load 'x';" +
                        "C = stream A through CMD;" +
                        "store C into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test14() {
+    public void test14() throws Exception {
         String query = "a = load 'x1' using PigStorage() as (name, age:int, gpa);" +
                        "b = load 'x2' as (name, age, registration, contributions);" +
                        "e = cogroup a by name, b by name parallel 8;" +
@@ -196,9 +212,9 @@ public class TestLogicalPlanGenerator {
                        "store g into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test15() {
+    public void test15() throws Exception {
         String query = "a = load 'x1' using PigStorage() as (name, age, gpa);" +
                        "b = group a all;" +
                        "c = foreach b generate AVG(a.age) as avg; " +
@@ -209,9 +225,9 @@ public class TestLogicalPlanGenerator {
                        "store y into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test16() {
+    public void test16() throws Exception {
         String query = "AA = load 'x';" +
                        "A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);" +
                        "store A into 'y';";
@@ -219,48 +235,48 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void test17() {
+    public void test17() throws Exception {
         String query = "store ( load 'x' ) into 'y';";
         generateLogicalPlan( query );
     }
 
     @Test
-    public void test18() {
+    public void test18() throws Exception {
         String query = "A = load 'x';\n" +
                        "C = group (foreach A generate $0 parallel 5) all;";
         generateLogicalPlan( query );
     }
 
     @Test
-    public void test19() {
+    public void test19() throws Exception {
         String query = "A = load 'x' as (u:map[], v);\n" +
                        "B = foreach A { T = (chararray)u#'hello'#'world'; generate T; };";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test20() {
+    public void test20() throws Exception {
         String query = "A = load 'x' using PigStorage() as (a:int,b:chararray);\n" +
                        "B = foreach A { C = TOMAP()#'key1'; generate C as C; };";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test21() {
+    public void test21() throws Exception {
         String query = "A = load 'x' as (u, v);\n" +
                        "B = foreach A { S = u; T = org.apache.pig.builtin.TOMAP(); generate S, T;};";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test22() {
+    public void test22() throws Exception {
         String query = "A = (load 'x' as (u, v));\n" +
                        "B = (group (foreach A generate $0 parallel 5) all);";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test23() {
+    public void test23() throws Exception {
         String query = "a = (load 'x1' using PigStorage() as (name, age, gpa));" +
                        "b = (group a all);" +
                        "c = (foreach b generate AVG(a.age) as avg); " +
@@ -271,9 +287,9 @@ public class TestLogicalPlanGenerator {
                        "store y into 'y';";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test24() {
+    public void test24() throws Exception {
         String query = "a = (load 'x1' using PigStorage() as (name, age:int, gpa));" +
                        "b = (load 'x2' as (name, age, registration, contributions));" +
                        "e = (cogroup a by name, b by name parallel 8);" +
@@ -282,10 +298,10 @@ public class TestLogicalPlanGenerator {
                        "(store g into 'y');";
         generateLogicalPlan( query );
     }
-    
+
     @Test
-    public void test25() {
-        String query = "A = (load 'x' as ( u:int, v:long, w:bytearray)); " + 
+    public void test25() throws Exception {
+        String query = "A = (load 'x' as ( u:int, v:long, w:bytearray)); " +
                        "B = (distinct A partition by org.apache.pig.Identity); " +
                        "C = (sample B 0.49); " +
                        "D = (order C by $0, $1); " +
@@ -297,48 +313,48 @@ public class TestLogicalPlanGenerator {
                        "L = (store J into 'output');";
         generateLogicalPlan( query );
     }
-    
+
+    @Test
+    public void testCubeBasic() throws Exception {
+      	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+      	        + "b = cube a by cube(x,y);"
+      	        + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+      	        + "store c into 'output';";
+      	generateLogicalPlan(query);
+    }
+
+    @Test
+    public void testCubeMultipleIAlias() throws Exception {
+      	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+      	        + "a = load 'input' as (x,y:chararray,z:long);"
+      	        + "a = load 'input' as (x:chararray,y:chararray,z:long);"
+      	        + "b = cube a by rollup(x,y);"
+      	        + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
+      	        + "store c into 'c';";
+      	generateLogicalPlan(query);
+    }
+
     @Test
-    public void testCubeBasic() {
-	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
-	        + "b = cube a by cube(x,y);"
-	        + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
-	        + "store c into 'output';";
-	generateLogicalPlan(query);
-    }
-    
-    @Test
-    public void testCubeMultipleIAlias() {
-	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
-	        + "a = load 'input' as (x,y:chararray,z:long);"
-	        + "a = load 'input' as (x:chararray,y:chararray,z:long);"
-	        + "b = cube a by rollup(x,y);"
-	        + "c = foreach b generate flatten(group) as (x,y), COUNT(cube) as count, SUM(cube.z) as total;"
-	        + "store c into 'c';";
-	generateLogicalPlan(query);
-    }
-    
-    @Test
-    public void testCubeAfterForeach() {
-	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
-	        + "b = foreach a generate x as type,y as location,z as number;"
-	        + "c = cube b by cube(type,location);"
-	        + "d = foreach c generate flatten(group) as (type,location), COUNT(cube) as count, SUM(cube.number) as total;"
-	        + "store d into 'd';";
-	generateLogicalPlan(query);
+    public void testCubeAfterForeach() throws Exception {
+      	String query = "a = load 'input' as (x:chararray,y:chararray,z:long);"
+      	        + "b = foreach a generate x as type,y as location,z as number;"
+      	        + "c = cube b by cube(type,location);"
+      	        + "d = foreach c generate flatten(group) as (type,location), COUNT(cube) as count, SUM(cube.number) as total;"
+      	        + "store d into 'd';";
+      	generateLogicalPlan(query);
     }
-    
+
     @Test
-    public void testFilter() {
-        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+    public void testFilter() throws Exception {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " +
                        "B = filter A by 2 > 1;\n" +
                        "store B into 'y';";
         generateLogicalPlan( query );
     }
 
     @Test
-    public void testScopedAlias() {
-        String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" + 
+    public void testScopedAlias() throws Exception {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
                        "B = load 'y' as ( u:int, x:int, y:chararray);" +
                        "C = join A by u, B by u;" +
                        "D = foreach C generate A::u, B::u, v, x;" +
@@ -347,93 +363,83 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void testConstantWithNegativeValue() {
-        String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" + 
+    public void testConstantWithNegativeValue() throws Exception {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" +
                        "B = foreach A generate u, { ( -1, -15L, -3.5, -4.03F, -2.3e3 ) };" +
                        "store B into 'y';";
         generateLogicalPlan ( query );
     }
 
-    @Test
-    public void testNegative1() {
+    @Test(expected = NonProjectExpressionException.class)
+    public void testNegative1() throws Exception {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
                        "B = foreach A { S = c * 2; T = limit S 100; generate T; };" +
                        "store B into 'y';";
-        try {
-            ParserTestingUtils.generateLogicalPlan( query );
-        } catch(Exception ex) {
-            Assert.assertTrue( ex instanceof NonProjectExpressionException );
-            return;
-        }
-        Assert.fail( "Query is supposed to be failing." );
+        ParserTestingUtils.generateLogicalPlan( query );
     }
-    
-    @Test
-    public void testNegative2() {
-    	String query = "ship = load 'x';";
-    	try {
+
+    @Test(expected = MismatchedTokenException.class)
+    public void testNegative2() throws Exception {
+      	String query = "ship = load 'x';";
+      	try {
             ParserTestingUtils.generateLogicalPlan( query );
         } catch(Exception ex) {
-            Assert.assertTrue( ex instanceof MismatchedTokenException );
             MismatchedTokenException mex = (MismatchedTokenException)ex;
-            Assert.assertTrue( mex.token.getText().equals("ship") );
-            return;
+            assertTrue( mex.token.getText().equals("ship") );
+            throw ex;
         }
-        Assert.fail( "Query is supposed to be failing." );
     }
 
-    @Test
-    public void testNegative3() {
+    @Test(expected = MismatchedTokenException.class)
+    public void testNegative3() throws Exception {
     	String query = "A = load 'y'; all = load 'x';";
     	try {
             ParserTestingUtils.generateLogicalPlan( query );
         } catch(Exception ex) {
-            Assert.assertTrue( ex instanceof MismatchedTokenException );
             MismatchedTokenException mex = (MismatchedTokenException)ex;
-            Assert.assertTrue( mex.token.getText().equals("all") );
-            return;
+            assertTrue( mex.token.getText().equals("all") );
+            throw ex;
         }
-        Assert.fail( "Query is supposed to be failing." );
     }
 
     @Test
-    public void testMultilineFunctionArgument() {
+    public void testMultilineFunctionArgument() throws Exception {
         String query = "LOAD 'testIn' \n" +
             "USING PigStorage ('\n');";
         generateLogicalPlan( query );
     }
-    
+
     @Test
     // See PIG-2320
-    public void testInlineOpInGroup() {
+    public void testInlineOpInGroup() throws Exception {
         String query = "a = load 'data1' as (x:int); \n" +
             "a_1 = filter (group a by x) by COUNT(a) > 0;";
         generateLogicalPlan( query );
     }
 
     @Test
-    public void testRank01() {
+    public void testRank01() throws Exception {
         String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
             + "B = rank A by sales;" + "store B into 'rank01_test';";
         generateLogicalPlan(query);
     }
 
     @Test
-    public void testRank02() {
+    public void testRank02() throws Exception {
         String query = "A = LOAD 'data4' AS (name:chararray,surname:chararray,sales:double,code:int);"
             + "C = rank A by sales DENSE;" + "store C into 'rank02_test';";
         generateLogicalPlan(query);
     }
 
     @Test
-    public void testRank03() {
+    public void testRank03() throws Exception {
         String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
             + "B = rank A;" + "store B into 'rank03_test';";
         generateLogicalPlan(query);
     }
 
     @Test
-    public void testRank04() {
+    public void testRank04() throws Exception {
         String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
             + "C = rank A by postalcode DESC;"
             + "store C into 'rank04_test';";
@@ -441,7 +447,7 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void testRank05() {
+    public void testRank05() throws Exception {
         String query = "A = load 'test02' using PigStorage(',') as (firstname:chararray,lastname:chararray,rownumberPrev:int,rankPrev:int,denserankPrev:int,quartilePrev:int,sales:double,postalcode:int);"
             + "D = rank A by postalcode DENSE;"
             + "store D into 'rank05_test';";
@@ -449,23 +455,61 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
-    public void testRank06() {
+    public void testRank06() throws Exception {
         String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
             + "C = rank A by x..rz;";
         generateLogicalPlan(query);
     }
 
     @Test
-    public void testRank07() {
+    public void testRank07() throws Exception {
         String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
             + "C = rank A by x ASC, y DESC;";
         generateLogicalPlan(query);
     }
 
     @Test
-    public void testRank08() {
+    public void testRank08() throws Exception {
         String query = "A = load 'data' as (x:int,y:chararray,z:int,rz:chararray);"
             + "C = rank A;";
         generateLogicalPlan(query);
     }
+
+    // See: PIG-2937
+    @Test
+    public void testRelationAliasInNestedForeachWhereUnspecified() throws Exception {
+        Data data = resetData(pigServer);
+        List<Tuple> values = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+          values.add(tuple(i % 3 == 0 ? null : "a", "b"));
+        }
+        data.set("foo", values);
+        pigServer.registerQuery("raw_data = load 'foo' using mock.Storage() as (field_a:chararray, field_b:chararray);");
+        pigServer.registerQuery("records = foreach raw_data {" +
+            "  generated_field = (field_a is null ? '-' : field_b);"+
+            "  GENERATE" +
+            "    field_a," +
+            "    field_b," +
+            "    generated_field; }");
+        pigServer.registerQuery("use_records = foreach records generate generated_field, CONCAT(generated_field,generated_field);");
+        Schema expectedSchema = Utils.getSchemaFromString("field_a:chararray, field_b:chararray, generated_field:chararray");
+        assertEquals(expectedSchema, pigServer.dumpSchema("records"));
+        for (Iterator<Tuple> it = pigServer.openIterator("records"); it.hasNext();) {
+            Tuple t = it.next();
+            String a = (String)t.get(0);
+            String b = (String)t.get(1);
+            assertEquals("b", b);
+            if (a == null) {
+                assertEquals("-", t.get(2));
+            } else {
+                assertEquals("a", a);
+                assertEquals(b, t.get(2));
+            }
+        }
+        for (Iterator<Tuple> it = pigServer.openIterator("use_records"); it.hasNext();) {
+            Tuple t = it.next();
+            String x = (String)t.get(0);
+            assertEquals(x+x, t.get(1));
+        }
+    }
 }