You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/05/12 23:14:01 UTC

svn commit: r1102461 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logic...

Author: thejas
Date: Thu May 12 21:14:00 2011
New Revision: 1102461

URL: http://svn.apache.org/viewvc?rev=1102461&view=rev
Log:
PIG-1938: support project-range as udf argument

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java
    pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/builtin/TOBAG.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    pig/trunk/test/org/apache/pig/test/TestProjectRange.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu May 12 21:14:00 2011
@@ -42,6 +42,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
 
 BUG FIXES
 
+PIG-1938: support project-range as udf argument (thejas)
+
 PIG-2048: Add zookeeper to pig jar (gbowyer via gates)
 
 PIG-2008: Cache outputFormat in HBaseStorage (thedatachef via gates)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Thu May 12 21:14:00 2011
@@ -90,6 +90,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
+import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
 import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
 import org.apache.pig.newplan.logical.visitor.TypeCheckingRelVisitor;
@@ -1665,6 +1666,7 @@ public class PigServer {
         }
         
         private void compile(LogicalPlan lp) throws FrontendException  {
+            new ProjStarInUdfExpander(lp).visit();
             new ColumnAliasConversionVisitor( lp ).visit();
             new SchemaAliasVisitor( lp ).visit();
             new ScalarVisitor( lp, pigContext ).visit();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu May 12 21:14:00 2011
@@ -164,7 +164,7 @@ public class POUserFunc extends Expressi
                 if(op instanceof POProject &&
                         op.getResultType() == DataType.TUPLE){
                     POProject projOp = (POProject)op;
-                    if(projOp.isStar()){
+                    if(projOp.isProjectToEnd()){
                         Tuple trslt = (Tuple) temp.result;
                         Tuple rslt = (Tuple) res.result;
                         for(int i=0;i<trslt.size();i++) {

Modified: pig/trunk/src/org/apache/pig/builtin/TOBAG.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TOBAG.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TOBAG.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TOBAG.java Thu May 12 21:14:00 2011
@@ -99,21 +99,22 @@ public class TOBAG extends EvalFunc<Data
      * 
      */
     @Override
-    public Schema outputSchema(Schema input) {
+    public Schema outputSchema(Schema inputSch) {
         byte type = DataType.ERROR;
         Schema innerSchema = null;
-        
-        for(FieldSchema fs : input.getFields()){
-         if(type == DataType.ERROR){
-             type = fs.type;
-             innerSchema = fs.schema;
-         }else{
-             if( type != fs.type || !nullEquals(innerSchema, fs.schema)){
-                 // invalidate the type
-                 type = DataType.ERROR;
-                 break;
-             }
-         }
+        if(inputSch != null){
+            for(FieldSchema fs : inputSch.getFields()){
+                if(type == DataType.ERROR){
+                    type = fs.type;
+                    innerSchema = fs.schema;
+                }else{
+                    if( type != fs.type || !nullEquals(innerSchema, fs.schema)){
+                        // invalidate the type
+                        type = DataType.ERROR;
+                        break;
+                    }
+                }
+            }
         }
         try {
             if(type == DataType.ERROR){

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java Thu May 12 21:14:00 2011
@@ -56,8 +56,10 @@ public class ProjectionPatcher implement
         
         @Override
         public void visit(ProjectExpression p) throws FrontendException {
-            // if projection is for everything, just return
-            if (p.isProjectStar()) {
+            // if project is a project-star or range, ie it could not be expanded
+            // then its not possible to determine the matching input columns
+            // before runtime
+            if (p.isRangeOrStarProject()) {
                 return;
             }
             

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjStarInUdfExpander.java Thu May 12 21:14:00 2011
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.PigException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+/**
+ * Expand project-star or project-range when used as udf argument.
+ * This is different from {@link ProjectStarExpander} because in those 
+ * cases, the project star gets expanded as new {@link LogicalExpressionPlan}.
+ * In case of project-star or project-range within udf, it should get expanded
+ * only as multiple inputs to this udf, no addtional {@link LogicalExpressionPlan}s
+ * are created.
+ * The expansion happens only if input schema is not null
+ */
+public class ProjStarInUdfExpander extends AllExpressionVisitor {
+
+    public ProjStarInUdfExpander(OperatorPlan plan) throws FrontendException {
+        super( plan, new DependencyOrderWalker( plan ) );
+    }
+
+    @Override
+    protected LogicalExpressionVisitor getVisitor(final LogicalExpressionPlan exprPlan)
+    throws FrontendException {
+        //This handles the expansion udf in all operators other than foreach
+        return new ProjExpanderForNonForeach(exprPlan);
+    }  
+
+
+    /* 
+     * LOForeach needs special handling because LOInnerLoad's inner ProjectExpression
+     * is the one that gets expanded
+     */
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException{
+        LogicalPlan innerPlan = foreach.getInnerPlan();
+
+        //visit the inner plan first
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+
+        //get the LOGenerate
+        List<Operator> feOutputs = innerPlan.getSinks();
+        LOGenerate gen = null;
+        for( Operator op  : feOutputs){
+            if(op instanceof LOGenerate){
+                if(gen != null){
+                    String msg = "Expected single LOGenerate output in innerplan of foreach";
+                    throw new VisitorException(foreach,
+                            msg,
+                            2266,
+                            PigException.BUG
+                    );
+                }
+                gen = (LOGenerate) op;
+            }
+        }
+
+        List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
+
+        if(loGenPreds == null){
+            // there are no LOInnerLoads , must be working on just constants
+            // no project-star expansion to be done
+            return;
+        }
+
+        //get mapping of LOGenerate predecessor current position to object
+        Map<Integer, LogicalRelationalOperator> oldPos2Rel =
+            new HashMap<Integer, LogicalRelationalOperator>();
+        
+        for(int i=0; i<loGenPreds.size(); i++){
+            oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
+        }
+
+        //store mapping between the projection in inner plans of
+        // of LOGenerate to the input relation object
+        Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
+            new HashMap<ProjectExpression, LogicalRelationalOperator>();
+        
+        List<LOInnerLoad> expandedInLoads = new ArrayList<LOInnerLoad>();
+        
+        //visit each expression plan, and expand the projects in the udf
+        for( OperatorPlan plan : gen.getOutputPlans()){
+            ProjExpanderForForeach projExpander = new ProjExpanderForForeach(
+                    plan,
+                    gen,
+                    oldPos2Rel,
+                    proj2InpRel,
+                    foreach,
+                    expandedInLoads
+            );
+            projExpander.visit();
+        }
+        
+        //remove the LOInnerLoads that have been expanded
+        for(LOInnerLoad inLoad : expandedInLoads){
+            innerPlan.disconnect(inLoad, gen);
+            innerPlan.remove(inLoad);
+        }
+        
+        //reset the input relation position in the projects
+        //get mapping of LoGenerate input relation to current position
+        Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
+        List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
+        int numNewGenPreds = 0;
+        if(newGenPreds != null)
+            numNewGenPreds = newGenPreds.size();
+            
+        for(int i=0; i<numNewGenPreds; i++){
+            rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
+        }
+        
+        //correct the input num for projects
+        for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
+           ProjectExpression proj = projAndInp.getKey();
+           LogicalRelationalOperator rel = projAndInp.getValue();
+           proj.setInputNum(rel2pos.get(rel));
+        }
+    }
+
+    @Override
+    public void visit(LOGenerate gen) throws FrontendException{
+        
+    }
+}
+
+class ProjExpanderForForeach extends LogicalExpressionVisitor{
+
+    private LOGenerate loGen;
+    private LogicalPlan innerRelPlan;
+    private Map<Integer, LogicalRelationalOperator> oldPos2Rel;
+    private Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel;
+    private LOForEach foreach;
+    private List<LOInnerLoad> expandedInLoads;
+
+    protected ProjExpanderForForeach(
+            OperatorPlan p, 
+            LOGenerate loGen, 
+            Map<Integer, LogicalRelationalOperator> oldPos2Rel,
+            Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel,
+            LOForEach foreach,
+            List<LOInnerLoad> expandedInLoads
+    )
+    throws FrontendException {
+        super(p, new ReverseDependencyOrderWalker(p));
+        this.loGen = loGen;
+        this.innerRelPlan = (LogicalPlan) loGen.getPlan();
+        this.oldPos2Rel = oldPos2Rel;
+        this.proj2InpRel = proj2InpRel;
+        this.foreach = foreach;
+        this.expandedInLoads = expandedInLoads;
+        
+    }
+    
+    @Override
+    public void visit(UserFuncExpression func) throws FrontendException{
+        if(plan.getSuccessors(func) == null){
+            // no args for the udf, so nothing to do
+            return;
+        }
+        List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
+
+        // expandedProjectStars will be removed from plan
+        List<Operator> expandedProjectStars = new ArrayList<Operator>();
+
+        // new projects to be added to the plan
+        List<Operator> newExpandedProjects =  new ArrayList<Operator>();
+
+        //new set of inputs 
+        List<Operator> newInputs = new ArrayList<Operator>();
+        
+
+        
+        for(Operator inp  : inputs){
+            if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject() 
+                        && oldPos2Rel.get(((ProjectExpression)inp).getInputNum()) instanceof LOInnerLoad
+            ){
+                //under foreach the ProjectExpression is always a project-star, 
+                // need to check if the input relation is a LOInnerLoad 
+                // containing project-star/range
+                LOInnerLoad inLoad = 
+                    (LOInnerLoad)oldPos2Rel.get(((ProjectExpression)inp).getInputNum());
+                
+                ProjectExpression innerProj = inLoad.getProjection();
+                if(!innerProj.isRangeOrStarProject()){
+                    newInputs.add(inp);
+                    continue;
+                }
+                
+                //try expanding the project-star/range
+                List<Operator> expandedOps = expandProjectStar(innerProj);
+                if(expandedOps != null){
+                    expandedProjectStars.add(inp);//to remove
+                    expandedInLoads.add(inLoad);//to remove
+                    newInputs.addAll(expandedOps); 
+                    newExpandedProjects.addAll(expandedOps);//to add
+                }else {
+                    newInputs.add(inp);
+                }
+            }else{
+                newInputs.add(inp);
+            }
+        }
+        
+        //make changes to the plan if there is a project that was expanded
+        if(expandedProjectStars.size() > 0){
+
+            //disconnect old inputs
+            for(Operator inp : inputs){
+                plan.disconnect(func, inp);
+            }
+
+            //remove expanded projects
+            for(Operator op : expandedProjectStars){
+                plan.remove(op);
+                proj2InpRel.remove(op);
+            }
+
+            //add new projects
+            for(Operator op : newExpandedProjects){
+                plan.add(op);
+            }
+
+            //connect new inputs
+            for(Operator newInp : newInputs){
+                plan.connect(func, newInp);
+            }
+        }
+        
+        
+    }
+    
+    @Override
+    public void visit(ProjectExpression proj){
+        //add project to LOInnerLoad mapping so that the input number can be
+        //corrected later
+        proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
+    }
+
+    private List<Operator> expandProjectStar(ProjectExpression proj)
+    throws FrontendException {
+        Pair<Integer, Integer> firstLastCols =
+            ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
+
+        if(firstLastCols == null){
+            //no expansion happening now
+            return null;
+        }
+        
+        //expand from firstProjCol to lastProjCol 
+        int firstProjCol = firstLastCols.first;
+        int lastProjCol = firstLastCols.second;
+
+
+        List<Operator> newProjects = new ArrayList<Operator>();
+        for(int i = firstProjCol; i <= lastProjCol; i++){
+            LOInnerLoad newILoad = new LOInnerLoad(innerRelPlan, foreach, i);
+            innerRelPlan.add(newILoad);
+            innerRelPlan.connect(newILoad, loGen);
+            ProjectExpression newProj = new ProjectExpression(plan, -2, -1, loGen) ;
+            proj2InpRel.put(newProj, newILoad);
+            newProjects.add(newProj);
+        } 
+
+        return newProjects;
+        
+    }
+}
+
+class ProjExpanderForNonForeach extends LogicalExpressionVisitor{
+
+    protected ProjExpanderForNonForeach(OperatorPlan p)
+    throws FrontendException {
+        super(p, new ReverseDependencyOrderWalker(p));
+    }
+
+    @Override
+    public void visit(UserFuncExpression func) throws FrontendException {
+        if(plan.getSuccessors(func) == null){
+            //udf without args, nothing to do 
+            return;
+        }
+        List<Operator> inputs = new ArrayList<Operator>(plan.getSuccessors(func));
+
+        // expandedProjectStars will be removed from plan
+        List<Operator> expandedProjectStars = new ArrayList<Operator>();
+
+        // new projects to be added to the plan
+        List<Operator> newExpandedProjects =  new ArrayList<Operator>();
+
+        //new set of inputs 
+        List<Operator> newInputs = new ArrayList<Operator>();
+
+        for(Operator inp  : inputs){
+            if(inp instanceof ProjectExpression && ((ProjectExpression)inp).isRangeOrStarProject() ){
+                //try expanding the project-star/range
+                List<Operator> expandedOps = expandProjectStar((ProjectExpression)inp);
+                if(expandedOps != null){
+                    expandedProjectStars.add(inp);//to remove
+                    newInputs.addAll(expandedOps); 
+                    newExpandedProjects.addAll(expandedOps);//to add
+                }else {
+                    newInputs.add(inp);
+                }
+            }else{
+                newInputs.add(inp);
+            }
+        }
+
+
+        //make changes to the plan if there is a project that was expanded
+        if(expandedProjectStars.size() > 0){
+
+            //disconnect old inputs
+            for(Operator inp : inputs){
+                plan.disconnect(func, inp);
+            }
+
+            //remove expanded projects
+            for(Operator op : expandedProjectStars){
+                plan.remove(op);
+            }
+
+            //add new projects
+            for(Operator op : newExpandedProjects){
+                plan.add(op);
+            }
+
+            //connect new inputs
+            for(Operator newInp : newInputs){
+                plan.connect(func, newInp);
+            }
+        }
+
+    }
+
+    private List<Operator> expandProjectStar(
+            ProjectExpression proj) throws FrontendException {
+
+        Pair<Integer, Integer> firstLastCols =
+            ProjectStarExpanderUtil.getProjectStartEndCols((LogicalExpressionPlan)plan, proj);
+
+
+        if(firstLastCols == null){
+            //no expansion happening now
+            return null;
+        }
+        //expand from firstProjCol to lastProjCol 
+        int firstProjCol = firstLastCols.first;
+        int lastProjCol = firstLastCols.second;
+
+
+        List<Operator> newProjects = new ArrayList<Operator>();
+        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
+        for(int i = firstProjCol; i <= lastProjCol; i++){
+            newProjects.add(new ProjectExpression(plan, proj.getInputNum(), i, relOp));
+        } 
+
+        return newProjects;
+    }
+
+
+};
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java Thu May 12 21:14:00 2011
@@ -27,6 +27,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.DepthFirstWalker;
 import org.apache.pig.newplan.Operator;
@@ -488,46 +489,22 @@ public class ProjectStarExpander extends
     private List<LogicalExpressionPlan> expandPlan(
             LogicalExpressionPlan expPlan, ProjectExpression proj, int inputNum)
             throws FrontendException {
-        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
-
-        // list of inputs of attached relation
-        List<Operator> inputRels = relOp.getPlan().getPredecessors(relOp);
-
-        //the relation that is input to this project 
-        LogicalRelationalOperator inputRel =
-            (LogicalRelationalOperator) inputRels.get(proj.getInputNum());
-
+        
+        Pair<Integer, Integer> startAndEndProjs =
+            ProjectStarExpanderUtil.getProjectStartEndCols(expPlan, proj);  
         List<LogicalExpressionPlan> newPlans = new ArrayList<LogicalExpressionPlan>();
 
-        LogicalSchema inputSchema = inputRel.getSchema();
-        if(inputSchema == null && 
-                (proj.isProjectStar() || (proj.isRangeProject() && proj.getEndCol() == -1))
-        ){
-            // can't expand if input schema is null and it is a project-star
-            // or project-range-until-end
+        if(startAndEndProjs == null){
+            // can't expand this project
             newPlans.add(expPlan);
             return newPlans;
         }
 
-        //expand from firstProjCol to lastProjCol after setting their values
-        int firstProjCol;
-        int lastProjCol;
-
-        //the range values are set in the project in LOInnerLoad
-        if(proj.isRangeProject()){
-            proj.setColumnNumberFromAlias();
-            firstProjCol = proj.getStartCol();
-            
-            if(proj.getEndCol() >= 0)
-                lastProjCol = proj.getEndCol();
-            else
-                lastProjCol = inputSchema.size() - 1;
-        }else{
-            //project-star
-            firstProjCol = 0;
-            lastProjCol = inputSchema.size() - 1;
-        }
-        
+        //expand from firstProjCol to lastProjCol 
+        int firstProjCol = startAndEndProjs.first;
+        int lastProjCol = startAndEndProjs.second;
+
+        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
         for(int i = firstProjCol; i <= lastProjCol; i++){
             newPlans.add(createExpPlanWithProj(relOp, inputNum, i));
         }

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ProjectStarExpanderUtil.java Thu May 12 21:14:00 2011
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+/**
+ * Util function(s) for project-(star/range) expansion
+ */
+public class ProjectStarExpanderUtil{
+
+    /**
+     * If the argument project is a project-star or project-range that
+     * can be expanded, find the position of first and last columns 
+     * it should project  
+     * @param expPlan
+     * @param proj
+     * @return pair that has the first and last columns that need to be projected 
+     * @throws FrontendException
+     */
+    static Pair<Integer, Integer> getProjectStartEndCols(
+            LogicalExpressionPlan expPlan, ProjectExpression proj)
+            throws FrontendException {
+        
+        // get the input schema first
+        
+        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
+
+        // list of inputs of attached relation
+        List<Operator> inputRels = relOp.getPlan().getPredecessors(relOp);
+
+        //the relation that is input to this project 
+        LogicalRelationalOperator inputRel =
+            (LogicalRelationalOperator) inputRels.get(proj.getInputNum());
+
+        LogicalSchema inputSchema = inputRel.getSchema();
+        
+        
+        if(inputSchema == null && 
+                (proj.isProjectStar() || (proj.isRangeProject() && proj.getEndCol() == -1))
+        ){
+            // can't expand if input schema is null and it is a project-star
+            // or project-range-until-end
+            return null;
+        }
+
+        //find first and last column in input schema to be projected
+        int firstProjCol;
+        int lastProjCol;
+
+        //the range values are set in the project in LOInnerLoad
+        if(proj.isRangeProject()){
+            proj.setColumnNumberFromAlias();
+            firstProjCol = proj.getStartCol();
+            
+            if(proj.getEndCol() >= 0)
+                lastProjCol = proj.getEndCol();
+            else
+                lastProjCol = inputSchema.size() - 1;
+        }else{
+            //project-star
+            firstProjCol = 0;
+            lastProjCol = inputSchema.size() - 1;
+        }
+        return new Pair<Integer, Integer>(firstProjCol, lastProjCol);
+
+    }
+
+
+
+}

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu May 12 21:14:00 2011
@@ -218,7 +218,7 @@ rel 
 ;
 
 flatten_generated_item 
-    : ( flatten_clause | expr | STAR { sb.append(" ").append($STAR.text); } ) ( { sb.append(" AS "); } field_def_list)?
+    : ( flatten_clause | col_range | expr | STAR { sb.append(" ").append($STAR.text); } ) ( { sb.append(" AS "); } field_def_list)?
 ;
 
 flatten_clause 
@@ -248,7 +248,7 @@ func_eval
 ;
 
 real_arg 
-    : expr | STAR { sb.append($STAR.text); }
+    : expr | STAR { sb.append($STAR.text); } | col_range
 ;
 
 expr 
@@ -302,6 +302,9 @@ col_index 
     : DOLLARVAR { sb.append($DOLLARVAR.text); }
 ;
 
+col_range :  ^(COL_RANGE col_ref? { sb.append(".."); } DOUBLE_PERIOD col_ref?)
+;
+
 pound_proj 
     : ^( POUND { sb.append($POUND.text); }
         ( QUOTEDSTRING { sb.append($QUOTEDSTRING.text); } | NULL { sb.append($NULL.text); } ) )
@@ -332,7 +335,7 @@ order_by_clause 
 ;
 
 order_col 
-    : col_ref ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?    
+    : (col_range | col_ref) ( ASC { sb.append(" ").append($ASC.text); } | DESC { sb.append(" ").append($DESC.text); } )?    
 ;
 
 distinct_clause 
@@ -377,7 +380,7 @@ join_group_by_clause
 ;
 
 join_group_by_expr 
-    : expr | STAR { sb.append($STAR.text); }
+    : col_range | expr | STAR { sb.append($STAR.text); }
 ;
 
 union_clause 

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu May 12 21:14:00 2011
@@ -278,7 +278,7 @@ cond : ^( OR cond cond )
 func_eval: ^( FUNC_EVAL func_name real_arg* )
 ;
 
-real_arg : expr | STAR
+real_arg : expr | STAR | col_range
 ;
 
 expr : ^( PLUS expr expr )

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu May 12 21:14:00 2011
@@ -653,6 +653,7 @@ real_arg [LogicalExpressionPlan plan] re
        $expr = builder.buildProjectExpr( new SourceLocation( (PigParserNode)$STAR ), $plan, $GScope::currentOp, 
            $statement::inputIndex, null, -1 );
    }
+ | cr = col_range[$plan] { $expr = $cr.expr;}
 ;
 
 expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu May 12 21:14:00 2011
@@ -386,7 +386,7 @@ real_arg_list : real_arg ( COMMA real_ar
              -> real_arg+
 ;
 
-real_arg : expr | STAR
+real_arg : expr | STAR | col_range
 ;
 
 null_check_cond : expr IS! NOT? NULL^

Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Thu May 12 21:14:00 2011
@@ -226,7 +226,7 @@ public class TestNewPlanFilterAboveForea
     public void test2() throws Exception {
         String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
         "B = FOREACH A GENERATE $0, b;" +
-        "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+        "C = FILTER B BY " + SIZE.class.getName() +"(TOTUPLE(*)) > 5;" +
         "STORE C INTO 'empty';";  
         LogicalPlan newLogicalPlan = buildPlan( query );
 
@@ -271,7 +271,7 @@ public class TestNewPlanFilterAboveForea
     public void test4() throws Exception {
         String query = "A =LOAD 'file.txt' AS (a:(u,v), b, c);" +
         "B = FOREACH A GENERATE $0, b, flatten(1);" +
-        "C = FILTER B BY " + SIZE.class.getName() +"(*) > 5;" +
+        "C = FILTER B BY " + SIZE.class.getName() +"(TOTUPLE(*)) > 5;" +
         "STORE C INTO 'empty';";  
         LogicalPlan newLogicalPlan = buildPlan( query );
 

Modified: pig/trunk/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectRange.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestProjectRange.java Thu May 12 21:14:00 2011
@@ -22,6 +22,7 @@ import static org.apache.pig.ExecType.LO
 import static org.apache.pig.ExecType.MAPREDUCE;
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -80,10 +81,8 @@ public class TestProjectRange  {
             execType = PigServer.parseExecType(execTypeString);
         }
 
-        PrintWriter w = new PrintWriter(new FileWriter(INP_FILE_5FIELDS));
-        w.println("10\t20\t30\t40\t50");
-        w.println("11\t21\t31\t41\t51");
-        w.close();
+        String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
+        Util.createLocalInputFile(INP_FILE_5FIELDS, input);
 
         if(execType == MAPREDUCE) {
             cluster = MiniCluster.buildCluster();
@@ -107,7 +106,7 @@ public class TestProjectRange  {
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
-
+        new File(INP_FILE_5FIELDS).delete();
         if(cluster != null)
             cluster.shutDown();
     }
@@ -415,11 +414,9 @@ public class TestProjectRange  {
         // without aliases
         query =
             "  l1 = load '" + INP_FILE_5FIELDS + "';"
-            + "f = foreach l1 generate ..$3  ;"
+            + "f = foreach l1 generate ..$3  as (a,b,c,d);"
             ; 
-        // the schema should be null, but that is not the case
-        // see - PIG-1910
-        //compileAndCompareSchema(null, query, "f");
+        compileAndCompareSchema("a : bytearray,b : bytearray,c : bytearray,d : bytearray", query, "f");
         
         
         Util.registerMultiLineQuery(pigServer, query);
@@ -430,8 +427,8 @@ public class TestProjectRange  {
         List<Tuple> expectedRes = 
             Util.getTuplesFromConstantTupleStringAsByteArray(
                     new String[] {
-                            "(10,20,30,40)",
-                            "(11,21,31,41)",
+                            "('10','20','30','40')",
+                            "('11','21','31','41')",
                     });
         Util.checkQueryOutputsAfterSort(it, expectedRes);
 
@@ -453,9 +450,7 @@ public class TestProjectRange  {
             "  l1 = load '" + INP_FILE_5FIELDS + "';"
             + "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 ..,  .. $1 ;"
             ; 
-        // the schema should be null, but that is not the case
-        // see - PIG-1910
-        //compileAndCompareSchema(null, query, "f");
+        compileAndCompareSchema((Schema)null, query, "f");
         
         
         Util.registerMultiLineQuery(pigServer, query);
@@ -466,8 +461,8 @@ public class TestProjectRange  {
         List<Tuple> expectedRes = 
             Util.getTuplesFromConstantTupleStringAsByteArray(
                     new String[] {
-                            "(10,50,40,50,10,20)",
-                            "(11,51,41,51,11,21)",
+                            "('10','50','40','50','10','20')",
+                            "('11','51','41','51','11','21')",
                     });
         Util.checkQueryOutputsAfterSort(it, expectedRes);
 
@@ -478,11 +473,8 @@ public class TestProjectRange  {
      * @throws IOException
      * @throws ParserException
      */
-    //@Test
+    @Test
     public void testRangeForeachWFilterNOSchema() throws IOException, ParserException {
-        //TODO: fix depends on PIG-1910
-        //fails with - In alias fil, incompatible types in GreaterThan 
-        //         Operator left hand side:Unknown right hand side:Unknown
         String query;
         
         query =
@@ -494,12 +486,12 @@ public class TestProjectRange  {
         Util.registerMultiLineQuery(pigServer, query);
 
         pigServer.explain("fil", System.err);
-        Iterator<Tuple> it = pigServer.openIterator("f");
+        Iterator<Tuple> it = pigServer.openIterator("fil");
 
         List<Tuple> expectedRes = 
             Util.getTuplesFromConstantTupleStringAsByteArray(
                     new String[] {
-                            "(11,51,41,51,11,21)",
+                            "('11','51','41','51','11','21')",
                     });
         Util.checkQueryOutputsAfterSort(it, expectedRes);
 

Added: pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java?rev=1102461&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestProjectStarRangeInUdf.java Thu May 12 21:14:00 2011
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Test Project-(star/range) expansion when used as udf argument
+ */
+public class TestProjectStarRangeInUdf  {
+
+    protected final Log log = LogFactory.getLog(getClass());
+
+    protected static PigServer pigServer;
+    private static final String INP_FILE_5FIELDS = "TestProjectRange_5fields";
+
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
+        Util.createLocalInputFile(INP_FILE_5FIELDS, input);
+    }
+
+    @Before
+    public void setup() throws ExecException{
+        pigServer = new PigServer(LOCAL);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        pigServer.shutdown();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        new File(INP_FILE_5FIELDS).delete();
+    }
+
+    @Test
+    public void testProjStarExpandInForeach1() throws IOException{
+        //star expansion lets CONCAT be used if input has two cols
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a, b);"
+            + "f = foreach l1 generate CONCAT(*) as ct;"
+            ; 
+        compileAndCompareSchema("ct : bytearray", query, "f");
+    }
+
+    @Test
+    public void testProjStarExpandInForeach1Negative() throws IOException{
+        //star expansion gives 3 columns, so CONCAT(*) gives error
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a, b, c);"
+            + "f = foreach l1 generate CONCAT(*) as ct;"
+            ; 
+        Util.checkExceptionMessage(query, "f",
+                "Could not infer the matching function for " +
+                "org.apache.pig.builtin.CONCAT");
+    }
+    
+    @Test
+    public void testProjStarExpandInForeach2() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+            + "f = foreach l1 generate TOTUPLE(*) as tb;"
+            ; 
+        compileAndCompareSchema("tb : (a : int, b : int, c : int)", query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "((10,20,30))",
+                            "((11,21,31))",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+
+    @Test
+    public void testProjStarExpandInFilter1() throws IOException{
+        //TOBAG has * and a bincond expression as argument
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int);"
+            + "f = filter l1 by SUM(TOBAG((a == 10 ? 100 : 0), *)) == 130;"
+            ; 
+        compileAndCompareSchema("a : int, b : int", query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(10,20)",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    @Test
+    public void testProjRangeExpandInFilterNoSchema1() throws IOException{
+        //star expansion lets CONCAT be used if input has two cols
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' ;"
+            + "f = filter l1 by SUM(TOBAG($0 .. $1)) == 30;"
+            ; 
+        compileAndCompareSchema((Schema)null, query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStringAsByteArray(
+                    new String[] {
+                            "('10','20','30','40','50')",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    /**
+     * Test project-range in foreach with limits on both sides
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testProjRangeExpandInForeach() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a, b : chararray, c : chararray, d);"
+            + "f = foreach l1 generate CONCAT($1 .. $2) as ct;"
+            ; 
+        compileAndCompareSchema("ct : chararray", query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "('2030')",
+                            "('2131')",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    @Test
+    public void testProjRangeExpandInJoin() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+            + "f1 = foreach l1 generate a, b, c, '1' as num;"
+            + "l2 = load '" + INP_FILE_5FIELDS + "' as (a : chararray, b : chararray, c : chararray, d);"
+            + "f2 = foreach l1 generate c, a, b, '2' as num;" 
+            + "j = join f1 by CONCAT(c, $0 .. $1), f2 by CONCAT($0, a .. b);"
+            ; 
+        String schStr =
+            "f1::a : chararray, f1::b : chararray, f1::c : chararray, f1::num : chararray," + 
+            "f2::c : chararray, f2::a : chararray, f2::b : chararray, f2::num : chararray";
+            
+        compileAndCompareSchema(schStr, query, "j");
+        Iterator<Tuple> it = pigServer.openIterator("j");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "('10', '20', '30', '1', '30', '10', '20', '2')",
+                            "('11', '21', '31', '1', '31', '11', '21', '2')",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+
+    
+    @Test
+    public void testProjMixExpand1() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int);"
+            + "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
+            ; 
+     
+        compileAndCompareSchema("tt : {(NullAlias : int)}", query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "({(10),(20),(30),(10),(20),(30)})",
+                            "({(11),(21),(31),(11),(21),(31)})",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    @Test
+    public void testProjMixExpand1NoSchema() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "';"
+            + "f = foreach l1 generate TOBAG(*, $0 .. $2) as tt;"
+            ; 
+        Schema sch = Utils.getSchemaFromString("tt : {(NullALias)}");
+        sch.getField(0).schema.getField(0).schema.getField(0).alias = null;
+        sch.getField(0).schema.getField(0).schema.getField(0).type = DataType.NULL;
+        
+        compileAndCompareSchema(sch, query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStringAsByteArray(
+                    new String[] {
+                            "({('10'),('20'),('30'),('40'),('50'),('10'),('20'),('30')})",
+                            "({('11'),('21'),('31'),('41'),('51'),('11'),('21'),('31')})",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    @Test
+    public void testProjMixExpand2() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int);"
+            + "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , d - 1) as tt;"
+            ; 
+     
+        String schStr = "tt : (NullAliasA : int, a : int, b : int," +
+            " NullAliasB : int, c : int, d : int, NullAliasC : int)";
+        compileAndCompareSchema(schStr, query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "((1,10,20,5,30,40,39))",
+                            "((1,11,21,5,31,41,40))",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    @Test
+    public void testProjMixExpand2NoSchema() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' ;"
+            + "f = foreach l1 generate TOTUPLE(1, $0 .. $1, 2+3, $2 .. , $4 -1) as tt;"
+            ; 
+     
+        compileAndCompareSchema("tt :()", query, "f");
+        pigServer.explain("f", System.out);
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStringAsByteArray(
+                    new String[] {
+                            "((1,'10','20',5,'30','40','50',49))",
+                            "((1,'11','21',5,'31','41','51',50))",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    @Test
+    public void testProjMixExpand3() throws IOException {
+
+        String query;
+
+        query =
+            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : chararray, d : chararray);"
+            + "f = foreach l1 generate TOTUPLE($0 .. $1, CONCAT($2 .. )) as tt;"
+            ; 
+     
+        String schStr = "tt : (a : int, b : int, NullAlias : chararray)";
+        compileAndCompareSchema(schStr, query, "f");
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "((10,20,'3040'))",
+                            "((11,21,'3141'))",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
+    
+    
+    private void compileAndCompareSchema(String expectedSchStr, String query, String alias)
+    throws IOException {
+
+        Schema expectedSch = null;
+        
+        if(expectedSchStr != null)
+            expectedSch = Utils.getSchemaFromString(expectedSchStr);
+        Util.schemaReplaceNullAlias(expectedSch);
+        compileAndCompareSchema(expectedSch, query, alias);
+
+    }
+
+    private void compileAndCompareSchema(Schema expectedSch, String query,
+            String alias) throws IOException {
+        Util.registerMultiLineQuery(pigServer, query);
+
+        Schema sch = pigServer.dumpSchema(alias);
+        assertEquals("Checking expected schema", expectedSch, sch);
+    }
+
+}

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1102461&r1=1102460&r2=1102461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu May 12 21:14:00 2011
@@ -76,6 +76,8 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
@@ -621,6 +623,12 @@ public class Util {
         return queryParser.parseConstant(pigConstantAsString);
     }
     
+    /**
+     * Parse list of strings in to list of tuples, convert quoted strings into
+     * @param tupleConstants
+     * @return
+     * @throws ParserException
+     */
     public static List<Tuple> getTuplesFromConstantTupleStrings(String[] tupleConstants) throws ParserException {
         List<Tuple> result = new ArrayList<Tuple>(tupleConstants.length);
         for(int i = 0; i < tupleConstants.length; i++) {
@@ -629,19 +637,52 @@ public class Util {
         return result;
     }
 
+    /**
+     * Parse list of strings in to list of tuples, convert quoted strings into
+     * DataByteArray
+     * @param tupleConstants
+     * @return
+     * @throws ParserException
+     * @throws ExecException
+     */
     public static List<Tuple> getTuplesFromConstantTupleStringAsByteArray(String[] tupleConstants)
     throws ParserException, ExecException {
         List<Tuple> tuples = getTuplesFromConstantTupleStrings(tupleConstants);
         for(Tuple t : tuples){
-            for(int i=0; i<t.size(); i++){
-                DataByteArray dba = (t.get(i) == null) ? 
-                        null : new DataByteArray(t.get(i).toString().getBytes());
-                t.set(i, dba);
-            }
+            convertStringToDataByteArray(t);
         }
         return tuples;
     }
     
+    /**
+     * Convert String objects in argument t to DataByteArray objects
+     * @param t
+     * @throws ExecException
+     */
+    private static void convertStringToDataByteArray(Tuple t) throws ExecException {
+        if(t == null)
+            return;
+        for(int i=0; i<t.size(); i++){
+            Object col = t.get(i);
+            if(col == null)
+                continue;
+            if(col instanceof String){
+                DataByteArray dba = (col == null) ? 
+                        null : new DataByteArray((String)col);                
+                t.set(i, dba);
+            }else if(col instanceof Tuple){
+                convertStringToDataByteArray((Tuple)col);
+            }else if(col instanceof DataBag){
+                Iterator<Tuple> it = ((DataBag)col).iterator();
+                while(it.hasNext()){
+                    convertStringToDataByteArray((Tuple)it.next());
+                }
+            }
+
+            
+        }        
+    }
+
     public static File createFile(String[] data) throws Exception{
         File f = File.createTempFile("tmp", "");
         PrintWriter pw = new PrintWriter(f);
@@ -951,4 +992,23 @@ public class Util {
         return lp;
     }
     
+    
+    /**
+     * Replaces any alias in given schema that has name that starts with 
+     *  "NullAlias" with null . it does  a case insensitive comparison of
+     *  the alias name
+     * @param sch
+     */
+    public static void schemaReplaceNullAlias(Schema sch){
+        if(sch == null)
+            return ;
+        for(FieldSchema fs : sch.getFields()){
+            if(fs.alias != null && fs.alias.toLowerCase().startsWith("nullalias")){
+                fs.alias = null;
+            }
+            schemaReplaceNullAlias(fs.schema);
+        }
+    }
+
+    
 }