You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/11 01:06:16 UTC

svn commit: r703598 - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/...

Author: olga
Date: Fri Oct 10 16:06:16 2008
New Revision: 703598

URL: http://svn.apache.org/viewvc?rev=703598&view=rev
Log:
PIG-489: (*) handling

Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Oct 10 16:06:16 2008
@@ -278,3 +278,5 @@
 
     PIG-465: performance improvement - removing keys from the value (pradeepk
     via olgan)
+    
+    PIG-489: (*) processing (sms via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Fri Oct 10 16:06:16 2008
@@ -83,6 +83,10 @@
         return mIsInner;
     }
 
+    public void setInner(boolean[] inner) {
+        mIsInner = inner;
+    }
+
     @Override
     public String name() {
         return "CoGroup " + mKey.scope + "-" + mKey.id;
@@ -192,7 +196,7 @@
                                         setFieldSchemaParent(groupByFs, positionOperators, i);
                                         break;
                                     } else {
-                                        if(j < aliases.length) {
+                                        if((j + 1) < aliases.length) {
                                             continue;
                                         } else {
                                             //we have seen this alias before
@@ -205,6 +209,8 @@
                                                     Schema.FieldSchema opFs = op.getFieldSchema();
                                                     if(null != opFs) {
                                                         groupByFs.setParent(opFs.canonicalName, eOp);
+                                                    } else {
+                                                        groupByFs.setParent(null, eOp);
                                                     }
                                                 }
                                             } else {
@@ -231,8 +237,26 @@
                 } else {
                     //We do not have any alias for this position in the group by columns
                     //We have positions $1, $2, etc.
-                    groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                    groupByFss.add(groupByFs);
+                    Collection<ExpressionOperator> cEops = positionOperators.get(i);
+                    if(null != cEops) {
+                        ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0];
+                        if(null != eOp) {
+                            Schema.FieldSchema fs = eOp.getFieldSchema();
+                            if(null != fs) {
+                                groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+                                groupByFss.add(groupByFs);
+                            } else {
+                                groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
+                                groupByFss.add(groupByFs);
+                            }
+                        } else {
+                            groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                            groupByFss.add(groupByFs);
+                        }
+                    } else {
+                        groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                        groupByFss.add(groupByFs);
+                    }
                     setFieldSchemaParent(groupByFs, positionOperators, i);
                 }
             }            
@@ -266,7 +290,11 @@
                     }
                 }
                 
-                fss.add(new Schema.FieldSchema("group", mergedGroupSchema));
+                Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", mergedGroupSchema);
+                fss.add(groupByFs);
+                for(int i = 0; i < arity; ++i) {
+                    setFieldSchemaParent(groupByFs, positionOperators, i);
+                }
             }
             for (LogicalOperator op : inputs) {
                 try {
@@ -341,6 +369,7 @@
             throw new FrontendException("getAtomicGroupByType is used only when"
                                      + " dealing with atomic group col") ;
         }
+
         byte groupType = DataType.BYTEARRAY ;
         // merge all the inner plan outputs so we know what type
         // our group column should be
@@ -388,13 +417,32 @@
             List<LogicalPlan> innerPlans
                         = new ArrayList<LogicalPlan>(getGroupByPlans().get(input)) ;
 
+            boolean seenProjectStar = false;
             for(int j=0;j < innerPlans.size(); j++) {
                 byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
                 ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+                if(eOp instanceof LOProject) {
+                    if(((LOProject)eOp).isStar()) {
+                        seenProjectStar = true;
+                    }
+                }
+                        
                 Schema.FieldSchema groupFs = fsList.get(j);
                 groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
-                groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+                Schema.FieldSchema fs = eOp.getFieldSchema();
+                if(null != fs) {
+                    groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+                } else {
+                    groupFs.setParent(null, eOp);
+                }
             }
+
+            if(seenProjectStar) {
+                throw new FrontendException("Grouping attributes can either be star (*) or a list of expressions, but not both.");
+                
+            }
+
         }
 
         return new Schema(fsList) ;
@@ -405,6 +453,8 @@
             Schema.FieldSchema opFs = op.getFieldSchema();
             if(null != opFs) {
                 fs.setParent(opFs.canonicalName, op);
+            } else {
+                fs.setParent(null, op);
             }
         }
     }
@@ -415,6 +465,8 @@
             for(Schema.FieldSchema inputFs: s.getFields()) {
                 if(null != inputFs) {
                     fs.setParent(inputFs.canonicalName, op);
+                } else {
+                    fs.setParent(null, op);
                 }
             }
         } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Fri Oct 10 16:06:16 2008
@@ -86,10 +86,26 @@
         return mForEachPlans;
     }
 
+    public void setForEachPlans(ArrayList<LogicalPlan> foreachPlans) {
+        mForEachPlans = foreachPlans;
+    }
+
     public List<Boolean> getFlatten() {
         return mFlatten;
     }
 
+    public void setFlatten(ArrayList<Boolean> flattenList) {
+        mFlatten = flattenList;
+    }
+
+    public List<Schema> getUserDefinedSchema() {
+        return mUserDefinedSchema;
+    }
+
+    public void setUserDefinedSchema(ArrayList<Schema> userDefinedSchema) {
+        mUserDefinedSchema = userDefinedSchema;
+    }
+
     @Override
     public String name() {
         return "ForEach " + mKey.scope + "-" + mKey.id;
@@ -148,6 +164,24 @@
                 log.debug("Flatten: " + mFlatten.get(planCtr));
                 Schema.FieldSchema planFs;
 
+                if(op instanceof LOProject) {
+                    //the check for the type is required for statements like
+                    //foreach cogroup {
+                    // a1 = order a by *;
+                    // generate a1;
+                    //}
+                    //In the above script, the generate a1, will translate to 
+                    //project(a1) -> project(*) and will not be translated to a sequence of projects
+                    //As a result the project(*) will remain but the return type is a bag
+                    //project*) with a data type set to tuple indicataes a project(*) from an input
+                    //that has no schema
+                    if( (((LOProject)op).isStar() ) && (((LOProject)op).getType() == DataType.TUPLE) ) {
+                        mSchema = null;
+                        mIsSchemaComputed = true;
+                        return mSchema;
+                    }
+                }
+                
                 try {
 	                planFs = ((ExpressionOperator)op).getFieldSchema();
                     log.debug("planFs: " + planFs);
@@ -285,9 +319,9 @@
                                 throw new FrontendException(pe.getMessage());
                             }
                         } else {
-                            Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-						    fss.add(newFs);
-                            newFs.setParent(null, op);
+                            mSchema = null;
+                            mIsSchemaComputed = true;
+                            return mSchema;
                         }
 					}
                 } catch (FrontendException fee) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Fri Oct 10 16:06:16 2008
@@ -205,7 +205,7 @@
                             //TODO
                             //the type of the operator will be unknown. when type checking is in place
                             //add the type of the operator as a parameter to the fieldschema creation
-                            mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
+                            mFieldSchema = new Schema.FieldSchema(null, expressionOperator.getSchema(), DataType.TUPLE);
                             mFieldSchema.setParent(null, expressionOperator);
                             //mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
                         }
@@ -218,9 +218,6 @@
                     mIsFieldSchemaComputed = false;
                     throw fee;
                 }
-                log.debug("mIsStar is true, returning schema of expressionOperator");
-                log.debug("Exiting getSchema()");
-                return mFieldSchema;
             } else {
                 //its n list of columns to project including a single column
                 List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(mProjection.size());
@@ -323,8 +320,8 @@
                 mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
                 mFieldSchema.setParent(null, expressionOperator);
                 mIsFieldSchemaComputed = true;
-                log.debug("mIsStar is false, returning computed field schema of expressionOperator");
             }
+
         }
 
         if(null != mFieldSchema) {
@@ -339,7 +336,9 @@
                 mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.TUPLE);
                 mFieldSchema.setParent(null, expressionOperator);
             } else {
-                mFieldSchema.type = DataType.TUPLE;
+                if(null != mFieldSchema) {
+                    mFieldSchema.type = DataType.TUPLE;
+                }
             }
             setOverloaded(true);
             setType(DataType.TUPLE);
@@ -351,7 +350,9 @@
                     mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
                     mFieldSchema.setParent(null, expressionOperator);
                 } else {
-                    mFieldSchema.type = DataType.BAG;
+                    if(null != mFieldSchema) {
+                        mFieldSchema.type = DataType.BAG;
+                    }
                 }
                 setType(DataType.BAG);
             }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Fri Oct 10 16:06:16 2008
@@ -77,10 +77,18 @@
         return mSortColPlans;
     }
 
+    public void setSortColPlans(List<LogicalPlan> sortPlans) {
+        mSortColPlans = sortPlans;
+    }
+
     public List<Boolean> getAscendingCols() {
         return mAscCols;
     }
 
+    public void setAscendingCols(List<Boolean> ascCols) {
+        mAscCols = ascCols;
+    }
+
     public FuncSpec getUserFunc() {
         return mSortFunc;
     }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=703598&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Fri Oct 10 16:06:16 2008
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.data.DataType;
+
+/**
+ * A visitor to walk operators that contain a nested plan and translate project( * )
+ * operators to a list of projection operators, i.e., 
+ * project( * ) -> project(0), project(1), ... project(n-2), project(n-1)
+ */
+public class ProjectStarTranslator extends
+        LOVisitor {
+
+    public ProjectStarTranslator(LogicalPlan plan) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    /**
+     * 
+     * @param cg
+     *            the logical cogroup operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOCogroup cg) throws VisitorException {
+        //get the attributes of cogroup that are modified during the trnalsation
+        
+        MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
+
+        for(LogicalOperator op: cg.getInputs()) {
+            ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
+            for(LogicalPlan lp: mapGByPlans.get(op)) {
+                if (checkPlanForProjectStar(lp)) {
+                    ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+                    for(int j = 0; j < translatedPlans.size(); ++j) {
+                        newGByPlans.add(translatedPlans.get(j));
+                    }
+                } else {
+                    newGByPlans.add(lp);
+                }
+            }
+            mapGByPlans.removeKey(op);
+            mapGByPlans.put(op, newGByPlans);
+        }
+    }
+
+    /**
+     * 
+     * @param forEach
+     *            the logical foreach operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOForEach forEach) throws VisitorException {
+        //get the attributes of foreach that are modified during the trnalsation
+
+        super.visit(forEach);
+
+        //List of inner plans
+        ArrayList<LogicalPlan> foreachPlans = forEach.getForEachPlans();
+        ArrayList<LogicalPlan> newForeachPlans = new ArrayList<LogicalPlan>();
+        
+        //the flatten list
+        List<Boolean> flattenList = forEach.getFlatten();
+        ArrayList<Boolean> newFlattenList = new ArrayList<Boolean>();
+        
+        //user specified schemas in the as clause
+        List<Schema> userDefinedSchemaList = forEach.getUserDefinedSchema();
+        ArrayList<Schema> newUserDefinedSchemaList = new ArrayList<Schema>();
+
+        for(int i = 0; i < foreachPlans.size(); ++i) {
+            LogicalPlan lp = foreachPlans.get(i);
+            if(checkPlanForProjectStar(lp)) {
+                ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+                Schema s = userDefinedSchemaList.get(i);
+                for(int j = 0; j < translatedPlans.size(); ++j) {
+                    LogicalPlan translatedPlan = translatedPlans.get(j);
+                    newForeachPlans.add(translatedPlan);
+                    newFlattenList.add(flattenList.get(i));
+                    if(null != s) {
+                        try {
+                            if(j < s.size()) {
+                                newUserDefinedSchemaList.add(new Schema(s.getField(j)));
+                            } else {
+                                newUserDefinedSchemaList.add(null);
+                            }
+                        } catch (ParseException pe) {
+                            throw new VisitorException(pe.getMessage(), pe);
+                        }
+                    } else {
+                        newUserDefinedSchemaList.add(null);
+                    }
+                }
+            } else {
+                newForeachPlans.add(lp);
+                newFlattenList.add(flattenList.get(i));
+                if(null != userDefinedSchemaList) {
+                    newUserDefinedSchemaList.add(userDefinedSchemaList.get(i));
+                } else {
+                    newUserDefinedSchemaList.add(null);
+                }
+            }
+        }
+        forEach.setForEachPlans(newForeachPlans);
+        forEach.setFlatten(newFlattenList);
+        forEach.setUserDefinedSchema(newUserDefinedSchemaList);
+
+    }
+
+    /**
+     * 
+     * @param s
+     *            the logical sort operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOSort s) throws VisitorException {
+        //get the attributes of sort that are modified during the trnalsation
+
+        //List of inner plans
+        List<LogicalPlan> sortPlans = s.getSortColPlans();
+        ArrayList<LogicalPlan> newSortPlans = new ArrayList<LogicalPlan>();
+
+        //sort order
+        List<Boolean> sortOrder = s.getAscendingCols();
+        ArrayList<Boolean> newSortOrder = new ArrayList<Boolean>();
+        
+        for(int i = 0; i < sortPlans.size(); ++i) {
+            LogicalPlan lp = sortPlans.get(i);
+            if(checkPlanForProjectStar(lp)) {
+                ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+                for(int j = 0; j < translatedPlans.size(); ++j) {
+                    newSortPlans.add(translatedPlans.get(j));
+                    newSortOrder.add(sortOrder.get(i));
+                }
+            } else {
+                newSortPlans.add(lp);
+                newSortOrder.add(sortOrder.get(i));
+            }
+        }
+        s.setSortColPlans(newSortPlans);
+        s.setAscendingCols(newSortOrder);
+    }
+
+    private boolean checkPlanForProjectStar(LogicalPlan lp) {
+        List<LogicalOperator> leaves = lp.getLeaves();
+
+        for(LogicalOperator op: leaves) {
+            if(op instanceof LOProject) {
+                if(((LOProject) op).isStar() && ((LOProject)op).getType() != DataType.BAG) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private LOProject getProjectStarFromPlan(LogicalPlan lp) {
+        List<LogicalOperator> leaves = lp.getLeaves();
+
+        for(LogicalOperator op: leaves) {
+            if(op instanceof LOProject) {
+                if(((LOProject) op).isStar()) {
+                    return (LOProject)op;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private ArrayList<LogicalPlan> translateProjectStarInPlan(LogicalPlan lp) throws VisitorException {
+        //translate the project( * ) into a list of projections
+        LOProject projectStar = getProjectStarFromPlan(lp);
+        LogicalOperator projectInput = projectStar.getExpression();
+        ArrayList<LogicalPlan> translatedPlans = new ArrayList<LogicalPlan>();
+        Schema s = null;
+        try {
+            if(!(projectInput instanceof ExpressionOperator)) {
+                s = projectInput.getSchema();
+            } else {
+                Schema.FieldSchema fs = ((ExpressionOperator)projectInput).getFieldSchema();
+                if(null != fs) {
+                    s = fs.schema;
+                }
+            }
+        } catch (FrontendException fee) {
+            throw new VisitorException(fee.getMessage(), fee);
+        }
+        if (null != s) {
+            for(int i = 0; i < s.size(); ++i) {
+                LogicalPlan replicatedPlan = replicatePlan(lp);
+                replaceProjectStar(replicatedPlan, projectStar, i);
+                translatedPlans.add(replicatedPlan);
+            }
+        } else {
+            translatedPlans.add(replicatePlan(lp));
+        }
+        return translatedPlans;
+    }
+
+    private LogicalPlan replicatePlan(LogicalPlan lp) throws VisitorException {
+        LogicalPlan replicatedPlan = new LogicalPlan();
+
+        for(LogicalOperator root: lp.getRoots()) {
+            replicatedPlan.add(root);
+            addSuccessors(lp, replicatedPlan, root);
+        }
+
+        return replicatedPlan;
+    }
+
+    private void addSuccessors(LogicalPlan lp, LogicalPlan replicatedPlan, LogicalOperator root) throws VisitorException {
+        List<LogicalOperator> successors = lp.getSuccessors(root);
+        if(null == successors) return;
+        for(LogicalOperator succ: successors) {
+            replicatedPlan.add(succ);
+            try {
+                replicatedPlan.connect(root, succ);
+            } catch (PlanException pe) {
+                throw new VisitorException(pe.getMessage(), pe);
+            }
+            addSuccessors(lp, replicatedPlan, succ);
+        }
+    }
+
+    private void replaceProjectStar(LogicalPlan lp, LOProject projectStar, int column) throws VisitorException {
+        String scope = projectStar.getOperatorKey().getScope();
+        LogicalOperator projectInput = projectStar.getExpression();
+        LogicalPlan projectPlan = projectStar.getPlan();
+        LOProject replacementProject = new LOProject(projectPlan, OperatorKey.genOpKey(scope), projectInput, column); 
+        try {
+            lp.replace(projectStar, replacementProject);
+        } catch (PlanException pe) {
+            throw new VisitorException(pe.getMessage(), pe);
+        }
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Oct 10 16:06:16 2008
@@ -55,6 +55,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.logicalLayer.ProjectStarTranslator;
 
 
 public class QueryParser {
@@ -764,7 +765,13 @@
 	{ 
 		if(null != root) {
             log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+
+            //Translate all the project(*) leaves in the plan to a sequence of projections
+            ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+            translate.visit();
+
             addLogicalPlan(root, lp);
+
             try {
 			    log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
             } catch(FrontendException fee) {
@@ -2479,7 +2486,9 @@
 	(
 		{ 
 			Schema.FieldSchema fs = item.getFieldSchema(); 
-			subSchema = fs.schema; 
+            if(null != fs) {
+			    subSchema = fs.schema; 
+            }
 			log.debug("subSchema: " + subSchema);
 		}
 		( 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Fri Oct 10 16:06:16 2008
@@ -390,6 +390,7 @@
                 sb.append(" cn: ");
                 sb.append(canonicalName);
             }
+
             return sb.toString();
         }
 
@@ -515,9 +516,9 @@
         mAliases = new HashMap<String, FieldSchema>(fields.size());
         mFieldSchemas = new MultiMap<String, String>();
         for (FieldSchema fs : fields) {
-            if (fs.alias != null) {
-                mAliases.put(fs.alias, fs);
-                if(null != fs) {
+            if(null != fs) {
+                if (fs.alias != null) {
+                    mAliases.put(fs.alias, fs);
                     mFieldSchemas.put(fs.canonicalName, fs.alias);
                 }
             }
@@ -533,9 +534,9 @@
         mFields.add(fieldSchema);
         mAliases = new HashMap<String, FieldSchema>(1);
         mFieldSchemas = new MultiMap<String, String>();
-        if (fieldSchema.alias != null) {
-            mAliases.put(fieldSchema.alias, fieldSchema);
-            if(null != fieldSchema) {
+        if(null != fieldSchema) {
+            if (fieldSchema.alias != null) {
+                mAliases.put(fieldSchema.alias, fieldSchema);
                 mFieldSchemas.put(fieldSchema.canonicalName, fieldSchema.alias);
             }
         }
@@ -554,13 +555,11 @@
             try {
                 for (int i = 0; i < s.size(); ++i) {
                     FieldSchema fs = new FieldSchema(s.getField(i));
+                    mFields.add(fs);
                     if(null != fs) {
-                        mFields.add(fs);
                         if (fs.alias != null) {
                             mAliases.put(fs.alias, fs);
-                            if(null != fs) {
-                                mFieldSchemas.put(fs.canonicalName, fs.alias);
-                            }
+                            mFieldSchemas.put(fs.canonicalName, fs.alias);
                         }
                     }
                 }
@@ -850,6 +849,11 @@
 
                 FieldSchema fs = schema.getField(i) ;
 
+                if(fs == null) {
+                    sb.append("null");
+                    continue;
+                }
+                
                 if (fs.alias != null) {
                     sb.append(fs.alias);
                     sb.append(": ");
@@ -870,8 +874,9 @@
                     }
                 } else if (fs.type == DataType.MAP) {
                     sb.append(DataType.findTypeName(fs.type) + "[ ]") ;
+                } else {
+                    sb.append(DataType.findTypeName(fs.type)) ;
                 }
-                // TODO: Support Map
             }
         }
 
@@ -886,8 +891,11 @@
 
     public void add(FieldSchema f) {
         mFields.add(f);
-        if (null != f.alias) {
-            mAliases.put(f.alias, f);
+        if(null != f) {
+            mFieldSchemas.put(f.canonicalName, f.alias);
+            if (null != f.alias) {
+                mAliases.put(f.alias, f);
+            }
         }
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Fri Oct 10 16:06:16 2008
@@ -1963,8 +1963,7 @@
         } catch (FrontendException fe) {
             String msg = "Cannot resolve COGroup output schema" ;
             msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(fe) ;
+            VisitorException vse = new VisitorException(msg, fe) ;
             throw vse ;
         }
         MultiMap<LogicalOperator, LogicalPlan> groupByPlans
@@ -2034,7 +2033,7 @@
                         byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
                         byte expectedType = DataType.BYTEARRAY ;
 
-                        if (!DataType.isAtomic(innerType)) {
+                        if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
                             String msg = "Sorry, group by complex types"
                                        + " will be supported soon" ;
                             msgCollector.collect(msg, MessageType.Error) ;
@@ -2065,8 +2064,7 @@
         catch (FrontendException fe) {
             String msg = "Cannot resolve COGroup output schema" ;
             msgCollector.collect(msg, MessageType.Error) ;
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(fe) ;
+            VisitorException vse = new VisitorException(msg, fe) ;
             throw vse ;
         }
 
@@ -2633,9 +2631,7 @@
                     || (op instanceof LOSplitOutput)
                     || (op instanceof LOLimit)) {
                 LogicalPlan lp = op.getPlan();
-                LoadFunc lf = getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
-                return lf;
-                //return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);        
+                return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);        
             }
             
             Schema s = op.getSchema();
@@ -2711,4 +2707,5 @@
         throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
     }
 
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Oct 10 16:06:16 2008
@@ -44,7 +44,6 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.ExecType;
-//import org.apache.pig.impl.builtin.ShellBagEvalFunc;
 import org.apache.pig.impl.builtin.GFAny;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -60,6 +59,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
 import org.apache.pig.impl.logicalLayer.parser.ParseException ;
+import org.apache.pig.impl.util.MultiMap;
 
 
 public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -290,6 +290,16 @@
     }
     
     @Test
+    public void testQuery22Fail() {
+        buildPlan("A = load 'a';");
+        try {
+            buildPlan("B = group A by (*, $0);");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+        }
+    }
+    
+    @Test
     public void testQuery23() {
         buildPlan("A = load 'a';");
         buildPlan("B = load 'b';");
@@ -312,6 +322,17 @@
     }
 
     @Test
+    public void testQuery23Fail() {
+        buildPlan("A = load 'a';");
+        buildPlan("B = load 'b';");
+        try {
+            buildPlan("C = group A by (*, $0), B by ($0, $1);");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+        }
+    }
+
+    @Test
     public void testQuery24() {
         buildPlan("a = load 'a';");
         
@@ -1525,12 +1546,12 @@
         // by unambiguous free form alias, fully qualified alias
         // and partially qualified unambiguous alias
         String query = "a = load 'st10k' as (name, age, gpa);" +
-"b = group a by name;" +
-"c = foreach b generate flatten(a);" +
-"d = filter c by name != 'fred';" +
-"e = group d by name;" +
-"f = foreach e generate flatten(d);" +
-"g = foreach f generate name, d::a::name, a::name;";
+            "b = group a by name;" +
+            "c = foreach b generate flatten(a);" +
+            "d = filter c by name != 'fred';" +
+            "e = group d by name;" +
+            "f = foreach e generate flatten(d);" +
+            "g = foreach f generate name, d::a::name, a::name;";
         buildPlan(query);
     }
     
@@ -1539,12 +1560,145 @@
         // test that the alias "group" can be used
         // after a flatten(group)
         String query = "a = load 'st10k' as (name, age, gpa);" +
-"b = group a by name;" +
-"c = foreach b generate flatten(group), COUNT(a) as cnt;" +
-"d = foreach c generate group;";
+            "b = group a by name;" +
+            "c = foreach b generate flatten(group), COUNT(a) as cnt;" +
+            "d = foreach c generate group;";
         buildPlan(query);
     }
     
+    @Test
+    public void testQuery106()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        lp = buildPlan("b = foreach a generate *;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), false, true));
+
+    }
+
+    @Test
+    public void testQuery107()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+
+        buildPlan("a = load 'one';");
+
+        lp = buildPlan("b = foreach a generate *;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        assertTrue(checkPlanForProjectStar(foreachPlan));
+
+    }
+
+    @Test
+    public void testQuery108()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOCogroup cogroup;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        lp = buildPlan("b = group a by *;");
+        cogroup = (LOCogroup) lp.getLeaves().get(0);
+        Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+        Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+        Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
+        Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
+        Schema expectedSchema = new Schema(groupFs);
+        expectedSchema.add(bagAFs);
+        assertTrue(Schema.equals(cogroup.getSchema(), expectedSchema, false, true));
+
+    }
+
+    @Test
+    public void testQuery109()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOCogroup cogroup;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+        buildPlan("b = load 'two' as (first_name, enrol_age, high_school_gpa);");
+
+        lp = buildPlan("c = group a by *, b by *;");
+        cogroup = (LOCogroup) lp.getLeaves().get(0);
+        Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+        Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+        Schema bagBSchema = getSchemaFromString("first_name: bytearray, enrol_age: bytearray, high_school_gpa: bytearray");
+        Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
+        Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
+        Schema.FieldSchema bagBFs = new Schema.FieldSchema("b", bagBSchema, DataType.BAG);
+        Schema expectedSchema = new Schema(groupFs);
+        expectedSchema.add(bagAFs);
+        expectedSchema.add(bagBFs);
+        assertTrue(Schema.equals(cogroup.getSchema(), expectedSchema, false, true));
+
+    }
+
+    @Test
+    public void testQuery110()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOLoad load;
+        LOCogroup cogroup;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+        lp = buildPlan("b = load 'two';");
+
+        load = (LOLoad) lp.getLeaves().get(0);
+
+        lp = buildPlan("c = cogroup a by $0, b by *;");
+        cogroup = (LOCogroup) lp.getLeaves().get(0);
+
+        MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
+        LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
+        assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+
+    }
+
+    @Test
+    public void testQuery111()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOSort sort;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        lp = buildPlan("b = order a by *;");
+        sort = (LOSort) lp.getLeaves().get(0);
+
+        for(LogicalPlan sortPlan: sort.getSortColPlans()) {
+            assertTrue(checkPlanForProjectStar(sortPlan) == false);
+        }
+
+    }
+
+    @Test
+    public void testQuery112()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+        LOSort sort;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        buildPlan("b = group a by *;");
+        lp = buildPlan("c = foreach b {a1 = order a by *; generate a1;};");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
+            printPlan(foreachPlan);
+            assertTrue(checkPlanForProjectStar(foreachPlan) == true);
+        }
+
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        sort = (LOSort)foreachPlan.getPredecessors(foreachPlan.getLeaves().get(0)).get(0);
+
+        for(LogicalPlan sortPlan: sort.getSortColPlans()) {
+            assertTrue(checkPlanForProjectStar(sortPlan) == true);
+        }
+
+    }
+
     private Schema getSchemaFromString(String schemaString) throws ParseException {
         return getSchemaFromString(schemaString, DataType.BYTEARRAY);
     }
@@ -1568,6 +1722,20 @@
         System.err.println();
     }
     
+    private boolean checkPlanForProjectStar(LogicalPlan lp) {
+        List<LogicalOperator> leaves = lp.getLeaves();
+
+        for(LogicalOperator op: leaves) {
+            if(op instanceof LOProject) {
+                if(((LOProject) op).isStar()) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     // Helper Functions
     
     // Helper Functions
@@ -1598,24 +1766,6 @@
             
             //System.err.println("Query: " + query);
             
-            //Just the top level roots and their children
-            //Need a recursive one to travel down the tree
-            /*
-            for(LogicalOperator op: lp.getRoots()) {
-                System.err.println("Logical Plan Root: " + op.getClass().getName() + " object " + op);    
-
-                List<LogicalOperator> listOp = lp.getSuccessors(op);
-                
-                if(null != listOp) {
-                    Iterator<LogicalOperator> iter = listOp.iterator();
-                    while(iter.hasNext()) {
-                        LogicalOperator lop = iter.next();
-                        System.err.println("Successor: " + lop.getClass().getName() + " object " + lop);
-                    }
-                }
-            }
-            */
-
             assertNotNull(lp != null);
             return lp;
         } catch (IOException e) {

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=703598&r1=703597&r2=703598&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Fri Oct 10 16:06:16 2008
@@ -3009,6 +3009,72 @@
     }
 
     @Test
+    public void testGroupLineageStar() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (name, age, gpa);") ;
+        planTester.buildPlan("b = group a by *;") ;
+        planTester.buildPlan("c = foreach b generate flatten(group);") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
+    public void testGroupLineageStarNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = group a by *;") ;
+        planTester.buildPlan("c = foreach b generate flatten(group);") ;
+        LogicalPlan plan = planTester.buildPlan("d = foreach c generate $0 + 1;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+    }
+
+    @Test
     public void testCogroupLineage() throws Throwable {
         planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
         planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
@@ -3090,6 +3156,385 @@
     }
 
     @Test
+    public void testCogroupStarLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, field1 + 1, field4 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupStarLineageFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, field1 + 1, field4 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testCogroupStarLineage1() throws Throwable {
+        planTester.buildPlan("a = load 'a' using PigStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+        planTester.buildPlan("d = foreach c generate flatten(group), flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate $0 + 1, a::field1 + 1, field4 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(1);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupStarLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupStarLineageNoSchemaFail() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+            fail("Exception expected") ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (!collector.hasError()) {
+            throw new AssertionError("Expect error") ;
+        }
+
+    }
+
+    @Test
+    public void testCogroupMultiColumnProjectLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, a.(field1, field2), b.(field4);") ;
+        planTester.buildPlan("e = foreach d generate group, flatten($1), flatten($2);") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupProjectStarLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate * ;") ;
+        planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupProjectStarLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate * ;") ;
+        planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
+    public void testCogroupProjectStarLineageMixSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by field1, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate * ;") ;
+        planTester.buildPlan("f = foreach d generate group, flatten(a), flatten(b)  ;") ;
+        LogicalPlan plan = planTester.buildPlan("g = foreach f generate group, field1 + 1, $4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+            typeValidator.validate(plan, collector) ;
+        }
+        catch (PlanValidationException pve) {
+            //not good
+        }
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+    }
+
+    @Test
     public void testCogroupLineageFail() throws Throwable {
         planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
         planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
@@ -4168,6 +4613,88 @@
     }
 
     @Test
+    public void testCogroupSortStarLineage() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;
+        planTester.buildPlan("c = cogroup a by field1, b by field4 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by * desc;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, field1 + 1, field4 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
+    public void testCogroupSortStarLineageNoSchema() throws Throwable {
+        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
+        planTester.buildPlan("b = load 'a' using PigStorage() ;") ;
+        planTester.buildPlan("c = cogroup a by $0, b by $0 ;") ;
+        planTester.buildPlan("d = foreach c generate group, flatten(a), flatten(b)  ;") ;
+        planTester.buildPlan("e = order d by * desc;") ;
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, $1 + 1, $2 + 2.0  ;") ;
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
+
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+
+        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.BinStorage"));
+
+        foreachPlan = foreach.getForEachPlans().get(2);
+        exOp = foreachPlan.getRoots().get(0);
+        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
+        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
+        assertTrue(cast.getLoadFunc().toString().startsWith("org.apache.pig.builtin.PigStorage"));
+
+    }
+
+    @Test
     public void testCrossLineage() throws Throwable {
         planTester.buildPlan("a = load 'a' using BinStorage() as (field1, field2: float, field3: chararray );") ;
         planTester.buildPlan("b = load 'a' using PigStorage() as (field4, field5, field6: chararray );") ;