You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/02 22:36:50 UTC

svn commit: r701235 [1/3] - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/impl/logicalLayer/...

Author: olga
Date: Thu Oct  2 13:36:49 2008
New Revision: 701235

URL: http://svn.apache.org/viewvc?rev=701235&view=rev
Log:
PIG-335: lineage

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Thu Oct  2 13:36:49 2008
@@ -267,3 +267,5 @@
     PIG-54: MIN/MAX don't deal with invalid data (pradeepk via olgan)
 
     PIG-470: TextLoader should produce bytearrays (sms via olgan)
+
+    PIG-335: lineage (sms vi olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Oct  2 13:36:49 2008
@@ -1150,7 +1150,12 @@
         ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
                 .getExpression());
         physOp.setResultType(op.getType());
-        ((POCast) physOp).setLoadFSpec(load.getClass().getName());
+        LoadFunc lf = op.getLoadFunc();
+        String lfString = null;
+        if(null != lf) {
+            lfString = lf.getClass().getName();
+            ((POCast) physOp).setLoadFSpec(lfString);
+        }
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
@@ -1159,6 +1164,7 @@
         }
 
     }
+
     @Override
     public void visit(LOLimit limit) throws VisitorException {
             String scope = limit.getOperatorKey().scope;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Thu Oct  2 13:36:49 2008
@@ -44,7 +44,7 @@
  * Need the full operator implementation.
  */
 public class POCast extends ExpressionOperator {
-    private String loadFSpec;
+    private String loadFSpec = null;
 	transient private LoadFunc load;
 	private Log log = LogFactory.getLog(getClass());
     private boolean castNotNeeded = false;
@@ -63,7 +63,9 @@
     
     private void instantiateFunc() {
         if(load!=null) return;
-        this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+        if(this.loadFSpec != null) {
+            this.load = (LoadFunc) PigContext.instantiateFuncFromSpec(this.loadFSpec);
+        }
     }
     
     public void setLoadFSpec(String fSpec) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Thu Oct  2 13:36:49 2008
@@ -84,8 +84,7 @@
      *             if there is already a schema and the existing schema cannot
      *             be reconciled with this new schema.
      */
-    public final void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
-		log.debug("Inside setFieldSchema");
+    public void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
         mFieldSchema = fs;
         setAlias(fs.alias);
         setType(fs.type);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java Thu Oct  2 13:36:49 2008
@@ -53,9 +53,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java Thu Oct  2 13:36:49 2008
@@ -18,6 +18,7 @@
 
 package org.apache.pig.impl.logicalLayer;
 
+import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
@@ -30,6 +31,7 @@
 
     private static final long serialVersionUID = 2L;
     private ExpressionOperator mExpr;
+    private LoadFunc mLoadFunc = null;
 
     /**
      * 
@@ -66,14 +68,8 @@
     @Override
     public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            if(DataType.isAtomic(mType)) {
-                mFieldSchema = new Schema.FieldSchema(null, mType);
-                if (mExpr.getFieldSchema() != null) {
-                    mFieldSchema.canonicalName =
-                        mExpr.getFieldSchema().canonicalName;
-                }
-                mIsFieldSchemaComputed = true;
-            }
+            mFieldSchema = new Schema.FieldSchema(null, mType);
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }
@@ -88,4 +84,12 @@
         return false;
     }
 
+    public LoadFunc getLoadFunc() {
+        return mLoadFunc;
+    }
+
+    public void setLoadFunc(LoadFunc loadFunc) {
+        mLoadFunc = loadFunc;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Oct  2 13:36:49 2008
@@ -167,6 +167,7 @@
              */
             int arity = mGroupByPlans.get(inputs.get(0)).size();
             for (int i = 0; i < arity; ++i) {
+                Schema.FieldSchema groupByFs;
                 Collection<String> cAliases = positionAlias.get(i);
                 if(null != cAliases) {
                     Object[] aliases = cAliases.toArray();
@@ -181,11 +182,14 @@
                                     if(!aliasLookup.get(alias)) {
                                         Schema.FieldSchema fs = eOp.getFieldSchema();
                                         if(null != fs) {
-                                            groupByFss.add(new Schema.FieldSchema(alias, fs.schema, fs.type));
+                                            groupByFs = new Schema.FieldSchema(alias, fs.schema, fs.type);
+                                            groupByFss.add(groupByFs);
                                             aliasLookup.put(alias, true);
                                         } else {
-                                            groupByFss.add(new Schema.FieldSchema(alias, null, DataType.BYTEARRAY));
+                                            groupByFs = new Schema.FieldSchema(alias, null, DataType.BYTEARRAY);
+                                            groupByFss.add(groupByFs);
                                         }
+                                        setFieldSchemaParent(groupByFs, positionOperators, i);
                                         break;
                                     } else {
                                         if(j < aliases.length) {
@@ -195,10 +199,19 @@
                                             //just add the schema of the expression operator with the null alias
                                             Schema.FieldSchema fs = eOp.getFieldSchema();
                                             if(null != fs) {
-                                                groupByFss.add(new Schema.FieldSchema(null, fs.schema, fs.type));
+                                                groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+                                                groupByFss.add(groupByFs);
+                                                for(ExpressionOperator op: cEops) {
+                                                    Schema.FieldSchema opFs = op.getFieldSchema();
+                                                    if(null != opFs) {
+                                                        groupByFs.setParent(opFs.canonicalName, eOp);
+                                                    }
+                                                }
                                             } else {
-                                                groupByFss.add(new Schema.FieldSchema(null, null, DataType.BYTEARRAY));
+                                                groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
+                                                groupByFss.add(groupByFs);
                                             }
+                                            setFieldSchemaParent(groupByFs, positionOperators, i);
                                             break;
                                         }
                                     }
@@ -218,10 +231,10 @@
                 } else {
                     //We do not have any alias for this position in the group by columns
                     //We have positions $1, $2, etc.
-                    groupByFss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                    groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                    groupByFss.add(groupByFs);
+                    setFieldSchemaParent(groupByFs, positionOperators, i);
                 }
-                //The schema for these columns is the merged schema of the expression operatore
-                //This part is handled in the type checker
             }            
 
             groupBySchema = new Schema(groupByFss);
@@ -229,7 +242,9 @@
             if(1 == arity) {
                 byte groupByType = getAtomicGroupByType();
                 Schema groupSchema = groupByFss.get(0).schema;
-                fss.add(new Schema.FieldSchema("group", groupSchema, groupByType));
+                Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", groupSchema, groupByType);
+                setFieldSchemaParent(groupByFs, positionOperators, 0);
+                fss.add(groupByFs);
             } else {
                 Schema mergedGroupSchema = getTupleGroupBySchema();
                 if(mergedGroupSchema.size() != groupBySchema.size()) {
@@ -255,9 +270,10 @@
             }
             for (LogicalOperator op : inputs) {
                 try {
-                    Schema cSchema = op.getSchema();
-                    fss.add(new Schema.FieldSchema(op.getAlias(), op
-                            .getSchema(), DataType.BAG));
+                    Schema.FieldSchema bagFs = new Schema.FieldSchema(op.getAlias(),
+                            op.getSchema(), DataType.BAG);
+                    fss.add(bagFs);
+                    setFieldSchemaParent(bagFs, op);
                 } catch (FrontendException ioe) {
                     mIsSchemaComputed = false;
                     mSchema = null;
@@ -374,12 +390,35 @@
 
             for(int j=0;j < innerPlans.size(); j++) {
                 byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
-                fsList.get(j).type = DataType.mergeType(fsList.get(j).type,
-                                                        innerType) ;
+                ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+                Schema.FieldSchema groupFs = fsList.get(j);
+                groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
+                groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
             }
         }
 
         return new Schema(fsList) ;
     }
 
+    private void setFieldSchemaParent(Schema.FieldSchema fs, MultiMap<Integer, ExpressionOperator> positionOperators, int position) throws FrontendException {
+        for(ExpressionOperator op: positionOperators.get(position)) {
+            Schema.FieldSchema opFs = op.getFieldSchema();
+            if(null != opFs) {
+                fs.setParent(opFs.canonicalName, op);
+            }
+        }
+    }
+
+    private void setFieldSchemaParent(Schema.FieldSchema fs, LogicalOperator op) throws FrontendException {
+        Schema s = op.getSchema();
+        if(null != s) {
+            for(Schema.FieldSchema inputFs: s.getFields()) {
+                if(null != inputFs) {
+                    fs.setParent(inputFs.canonicalName, op);
+                }
+            }
+        } else {
+            fs.setParent(null, op);
+        }
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java Thu Oct  2 13:36:49 2008
@@ -67,6 +67,7 @@
             for (LogicalOperator op : inputs) {
                 String opAlias = op.getAlias();
                 Schema s = op.getSchema();
+                Schema.FieldSchema newFs;
 
                 //need to extract the children and create the aliases
                 //assumption here is that flatten is only for one column
@@ -78,7 +79,7 @@
                         log.debug("fs.alias: " + fs.alias);
                         if(null != fs.alias) {
                             String disambiguatorAlias = opAlias + "::" + fs.alias;
-                            Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+                            newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
                             fss.add(newFs);
                             Integer count;
                             count = aliases.get(fs.alias);
@@ -99,9 +100,10 @@
                             //we just need to record if its due to
                             //flattening
                         } else {
-                            Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                            newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                             fss.add(newFs);
                         }
+                        newFs.setParent(fs.canonicalName, op);
                     }
                 } else {
                     mSchema = null;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java Thu Oct  2 13:36:49 2008
@@ -64,7 +64,7 @@
                     throw new FrontendException("Could not find operator in plan");
                 }
                 if(op instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java Thu Oct  2 13:36:49 2008
@@ -53,9 +53,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java Thu Oct  2 13:36:49 2008
@@ -66,7 +66,7 @@
             ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
             try {
                 if(input instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = ((ExpressionOperator)input).getFieldSchema();
+                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)input).getFieldSchema());
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Oct  2 13:36:49 2008
@@ -172,6 +172,7 @@
                                     Schema.FieldSchema fs;
                                     try {
                                         fs = new Schema.FieldSchema(s.getField(i));
+                                        fs.setParent(s.getField(i).canonicalName, op);
                                     } catch (ParseException pe) {
                                         throw new FrontendException(pe.getMessage());
                                     }
@@ -195,13 +196,23 @@
 									if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
 										String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
 										newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
-										fss.add(newFs);
+                                        try {
+                                            newFs.setParent(s.getField(i).canonicalName, op);
+										} catch (ParseException pe) {
+                                            throw new FrontendException(pe.getMessage());
+                                        }
+                                        fss.add(newFs);
                                         updateAliasCount(aliases, disambiguatorAlias);
 										//it's fine if there are duplicates
 										//we just need to record if its due to
 										//flattening
 									} else {
-										newFs = new Schema.FieldSchema(fs.alias, fs.schema, fs.type);
+										newFs = new Schema.FieldSchema(fs);
+                                        try {
+                                            newFs.setParent(s.getField(i).canonicalName, op);
+										} catch (ParseException pe) {
+                                            throw new FrontendException(pe.getMessage());
+                                        }
 										fss.add(newFs);
 									}
                                     updateAliasCount(aliases, innerCanonicalAlias);
@@ -225,10 +236,13 @@
                                         }
                                         updateAliasCount(aliases, newFs.alias);
                                         fss.add(newFs);
+                                        newFs.setParent(null, op);
                                     } else {
                                         for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
                                             QueryParser.SchemaUtils.setFieldSchemaDefaultType(ufs, DataType.BYTEARRAY);
-                                            fss.add(new Schema.FieldSchema(ufs.alias, ufs.schema, ufs.type));
+                                            newFs = new Schema.FieldSchema(ufs);
+                                            fss.add(newFs);
+                                            newFs.setParent(null, op);
                                             updateAliasCount(aliases, ufs.alias);
                                         }
                                     }
@@ -239,6 +253,7 @@
 								        newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                     }
                                     fss.add(newFs);
+                                    newFs.setParent(null, op);
                                 }
 							}
 						} else {
@@ -262,14 +277,17 @@
                         String outerCanonicalAlias = null;
                         if(null != userDefinedSchema) {
                             try {
-                                Schema.FieldSchema userDefinedFieldSchema = userDefinedSchema.getField(0);
+                                Schema.FieldSchema userDefinedFieldSchema = new Schema.FieldSchema(userDefinedSchema.getField(0));
                                 fss.add(userDefinedFieldSchema);
+                                userDefinedFieldSchema.setParent(null, op);
                                 updateAliasCount(aliases, userDefinedFieldSchema.alias);
                             } catch (ParseException pe) {
                                 throw new FrontendException(pe.getMessage());
                             }
                         } else {
-						    fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                            Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+						    fss.add(newFs);
+                            newFs.setParent(null, op);
                         }
 					}
                 } catch (FrontendException fee) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Oct  2 13:36:49 2008
@@ -131,7 +131,7 @@
                     throw new FrontendException("Could not find operator in plan");
                 }
                 if(op instanceof ExpressionOperator) {
-                    fss.add(((ExpressionOperator)op).getFieldSchema());
+                    fss.add(new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema()));
                     mSchema = new Schema(fss);
                 } else {
                     mSchema = op.getSchema();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOIsNull.java Thu Oct  2 13:36:49 2008
@@ -51,10 +51,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java Thu Oct  2 13:36:49 2008
@@ -108,6 +108,7 @@
             } else {
                 mFieldSchema = new Schema.FieldSchema(null, mValueType);
             }
+            mFieldSchema.setParent(mMap.getFieldSchema().canonicalName, mMap);
 
             mIsFieldSchemaComputed = true;
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java Thu Oct  2 13:36:49 2008
@@ -53,9 +53,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java Thu Oct  2 13:36:49 2008
@@ -53,9 +53,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java Thu Oct  2 13:36:49 2008
@@ -47,9 +47,10 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
-        if(!mIsSchemaComputed) {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
+        if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, getOperand().getType());
+            mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java Thu Oct  2 13:36:49 2008
@@ -51,10 +51,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java Thu Oct  2 13:36:49 2008
@@ -53,10 +53,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Thu Oct  2 13:36:49 2008
@@ -194,7 +194,9 @@
                                 + expressionOperator.getClass().getName() + " " + expressionOperator);
                         if(!mSentinel) {
                             //we have an expression operator and hence a list of field shcemas
-                            mFieldSchema = ((ExpressionOperator)expressionOperator).getFieldSchema();
+                            Schema.FieldSchema fs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+                            mFieldSchema = new Schema.FieldSchema(fs);
+                            mFieldSchema.setParent(fs.canonicalName, expressionOperator);
                         } else {
                             //we have a relational operator as input and hence a schema
                             log.debug("expression operator alias: " + expressionOperator.getAlias());
@@ -204,6 +206,7 @@
                             //the type of the operator will be unknown. when type checking is in place
                             //add the type of the operator as a parameter to the fieldschema creation
                             mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
+                            mFieldSchema.setParent(null, expressionOperator);
                             //mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
                         }
                     } else {
@@ -233,23 +236,30 @@
                                 if(null != expOpFs) {
                                     Schema s = expOpFs.schema;
                                     if(null != s) {
-                                        mFieldSchema = new Schema.FieldSchema(s.getField(mProjection.get(0)));
+                                        Schema.FieldSchema fs = s.getField(mProjection.get(0));
+                                        mFieldSchema = new Schema.FieldSchema(fs);
+                                        mFieldSchema.setParent(fs.canonicalName, expressionOperator);
                                     } else {
                                         mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                        mFieldSchema.setParent(expOpFs.canonicalName, expressionOperator);
                                     }
                                 } else {
                                     mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    mFieldSchema.setParent(null, expressionOperator);
                                 }
                             } else {
                                 log.debug("Input is a logical operator");
-                                   Schema s = expressionOperator.getSchema();
+                                Schema s = expressionOperator.getSchema();
                                 log.debug("s: " + s);
                                 if(null != s) {
-                                    mFieldSchema = new Schema.FieldSchema(s.getField(mProjection.get(0)));
+                                    Schema.FieldSchema fs = s.getField(mProjection.get(0));
+                                    mFieldSchema = new Schema.FieldSchema(fs);
+                                    mFieldSchema.setParent(fs.canonicalName, expressionOperator);
                                     log.debug("mFieldSchema alias: " + mFieldSchema.alias);
                                     log.debug("mFieldSchema schema: " + mFieldSchema.schema);
                                 } else {
                                     mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    mFieldSchema.setParent(null, expressionOperator);
                                 }
                                 mType = mFieldSchema.type ;
                             }
@@ -259,6 +269,7 @@
                         
                         for (int colNum : mProjection) {
                             log.debug("Col: " + colNum);
+                            Schema.FieldSchema fs;
                             if(!mSentinel) {
                                 Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
                                 if(null != expOpFs) {
@@ -266,22 +277,36 @@
                                     log.debug("Schema s: " + s);
                                     if(null != s) {
                                         if(colNum < s.size()) {
-                                            fss.add(new Schema.FieldSchema(s.getField(colNum)));
+                                            Schema.FieldSchema parentFs = s.getField(colNum);
+                                            fs = new Schema.FieldSchema(parentFs);
+                                            fss.add(fs);
+                                            fs.setParent(parentFs.canonicalName, expressionOperator);
                                         } else {
-                                            fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                                            fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                            fss.add(fs);
+                                            fs.setParent(expOpFs.canonicalName, expressionOperator);
                                         }
                                     } else {
-                                        fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                                        fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                        fss.add(fs);
+                                        fs.setParent(expOpFs.canonicalName, expressionOperator);
                                     }
                                 } else {
+                                    fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                     fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                                    fs.setParent(null, expressionOperator);
                                 }
                             } else {
                                 Schema s = expressionOperator.getSchema();
                                 if(null != s) {
-                                    fss.add(new Schema.FieldSchema(s.getField(colNum)));
+                                    Schema.FieldSchema parentFs = s.getField(colNum);
+                                    fs = new Schema.FieldSchema(parentFs);
+                                    fss.add(fs);
+                                    fs.setParent(parentFs.canonicalName, expressionOperator);
                                 } else {
-                                    fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                                    fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    fss.add(fs);
+                                    fs.setParent(null, expressionOperator);
                                 }
                             }
                         }
@@ -296,6 +321,7 @@
                     throw new FrontendException(pe.getMessage());
                 }
                 mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
+                mFieldSchema.setParent(null, expressionOperator);
                 mIsFieldSchemaComputed = true;
                 log.debug("mIsStar is false, returning computed field schema of expressionOperator");
             }
@@ -311,6 +337,7 @@
             if(!DataType.isSchemaType(mType)) {
                 Schema pjSchema = new Schema(mFieldSchema);
                 mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.TUPLE);
+                mFieldSchema.setParent(null, expressionOperator);
             } else {
                 mFieldSchema.type = DataType.TUPLE;
             }
@@ -322,6 +349,7 @@
                 if(!DataType.isSchemaType(mType)) {
                     Schema pjSchema = new Schema(mFieldSchema);
                     mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
+                    mFieldSchema.setParent(null, expressionOperator);
                 } else {
                     mFieldSchema.type = DataType.BAG;
                 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java Thu Oct  2 13:36:49 2008
@@ -93,10 +93,12 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            Schema.FieldSchema fs = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema = fs;
+            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
+            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Thu Oct  2 13:36:49 2008
@@ -129,7 +129,7 @@
                     throw new FrontendException("Could not find operator in plan");
                 }
                 if(op instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java Thu Oct  2 13:36:49 2008
@@ -53,9 +53,11 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
             mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
+            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
+            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
             mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu Oct  2 13:36:49 2008
@@ -68,6 +68,22 @@
                         mSchema = op.getSchema();
                     }
                 }
+                if(null != mSchema) {
+                    for(Schema.FieldSchema fs: mSchema.getFields()) {
+                        iter = s.iterator();
+                        while(iter.hasNext()) {
+                            op = iter.next();
+                            Schema opSchema = op.getSchema();
+                            if(null != s) {
+                                for(Schema.FieldSchema opFs: opSchema.getFields()) {
+                                    fs.setParent(opFs.canonicalName, op);
+                                }
+                            } else {
+                                fs.setParent(null, op);
+                            }
+                        }
+                    }
+                }
                 mIsSchemaComputed = true;
             } catch (FrontendException fe) {
                 mSchema = null;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Thu Oct  2 13:36:49 2008
@@ -80,35 +80,37 @@
 
     @Override
     public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        Schema inputSchema = new Schema();
-        for(ExpressionOperator op: mArgs) {
-            if (!DataType.isUsableType(op.getType())) {
-                String msg = "Problem with input: " + op + " of User-defined function: " + this ;
-                mFieldSchema = null;
-                mIsFieldSchemaComputed = false;
-                throw new FrontendException(msg) ;
+        if(!mIsFieldSchemaComputed) {
+            Schema inputSchema = new Schema();
+            for(ExpressionOperator op: mArgs) {
+                if (!DataType.isUsableType(op.getType())) {
+                    String msg = "Problem with input: " + op + " of User-defined function: " + this ;
+                    mFieldSchema = null;
+                    mIsFieldSchemaComputed = false;
+                    throw new FrontendException(msg) ;
+                }
+                inputSchema.add(op.getFieldSchema());    
             }
-            inputSchema.add(op.getFieldSchema());    
-        }
-
-        EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
-        Schema udfSchema = ef.outputSchema(inputSchema);
-
-        if (null != udfSchema) {
-            Schema.FieldSchema fs;
-            try {
-                fs = new Schema.FieldSchema(udfSchema.getField(0));
-            } catch (ParseException pe) {
-                throw new FrontendException(pe.getMessage());
+    
+            EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
+            Schema udfSchema = ef.outputSchema(inputSchema);
+    
+            if (null != udfSchema) {
+                Schema.FieldSchema fs;
+                try {
+                    fs = new Schema.FieldSchema(udfSchema.getField(0));
+                } catch (ParseException pe) {
+                    throw new FrontendException(pe.getMessage());
+                }
+                setType(fs.type);
+                mFieldSchema = fs;
+                mIsFieldSchemaComputed = true;
+            } else {
+                byte returnType = DataType.findType(ef.getReturnType());
+                setType(returnType);
+                mFieldSchema = new Schema.FieldSchema(null, null, returnType);
+                mIsFieldSchemaComputed = true;
             }
-            setType(fs.type);
-            mFieldSchema = fs;
-            mIsFieldSchemaComputed = true;
-        } else {
-            byte returnType = DataType.findType(ef.getReturnType());
-            setType(returnType);
-            mFieldSchema = new Schema.FieldSchema(null, null, returnType);
-            mIsFieldSchemaComputed = true;
         }
         return mFieldSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Thu Oct  2 13:36:49 2008
@@ -167,11 +167,6 @@
      */
     public void setCanonicalNames() {
         for (Schema.FieldSchema fs : mSchema.getFields()) {
-            if (fs.canonicalName != null) {
-                throw new RuntimeException("Attempt to rename field " +
-                        fs.alias + " in operator " + name() + " that " +
-                    "already has canonical name "  + fs.canonicalName);
-            }
             fs.canonicalName = CanonicalNamer.getNewName();
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Thu Oct  2 13:36:49 2008
@@ -36,6 +36,11 @@
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
  * A visitor to discover if any schema has been specified for a file being
@@ -129,6 +134,17 @@
                     p.connect(proj, cast);
                     
                     cast.setFieldSchema(fs.clone());
+                    LoadFunc loadFunc = null;
+                    if(lo instanceof LOLoad) {
+                        loadFunc = ((LOLoad)lo).getLoadFunc();
+                    } else if (lo instanceof LOStream) {
+                        StreamingCommand command = ((LOStream)lo).getStreamingCommand();
+                        HandleSpec streamOutputSpec = command.getOutputSpec(); 
+                        loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+                    } else {
+                        throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
+                    }
+                    cast.setLoadFunc(loadFunc);
                     typeChanges.put(fs.canonicalName, fs.type);
                     // Reset the loads field schema to byte array so that it
                     // will reflect reality.

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Oct  2 13:36:49 2008
@@ -913,7 +913,6 @@
             {
                 SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
                 op.setSchema(schema); 
-                op.setCanonicalNames(); 
                 log.debug("Load as schema" + schema);
             } 
         |   fs = AtomSchema() 
@@ -949,7 +948,6 @@
             {
                 SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
                 op.setSchema(schema); 
-                op.setCanonicalNames(); 
                 log.debug("Stream as schema()"+ schema);
             } 
         | fs = AtomSchema() 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Oct  2 13:36:49 2008
@@ -32,9 +32,25 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.CanonicalNamer;
 
+/**
+ * The Schema class encapsulates the notion of a schema for a relational operator.
+ * A schema is a list of columns that describe the output of a relational operator.
+ * Each column in the relation is represented as a FieldSchema, a static class inside
+ * the Schema. A column by definition has an alias, a type and a possible schema (if the
+ * column is a bag or a tuple). In addition, each column in the schema has a unique
+ * auto generated name used for tracking the lineage of the column in a sequence of
+ * statements.
+ *
+ * The lineage of the column is tracked using a map of the predecessors' columns to
+ * the operators that generate the predecessor columns. The predecessor columns are the
+ * columns required in order to generate the column under consideration.  Similarly, a
+ * reverse lookup of operators that generate the predecessor column to the predecessor
+ * column is maintained.
+ */
 
 public class Schema implements Serializable, Cloneable {
 
@@ -71,7 +87,7 @@
          */
         public String canonicalName = null;
 
-        /**
+        /*
          * Map of canonical names used for this field in other sections of the
          * plan.  It can occur that a single field will have different
          * canonical names in different branches of a plan.  For example, 
@@ -79,10 +95,25 @@
          * column will have canonical name, say, of 'r'.  But in branches
          * above the cogroup it may have been known as 's' in the A branch and
          * 't' in the B branch.  This map preserves that.  The key is a
-         * logical operator's key, and the value is the canonical name
+         * logical operator, and the value is the canonical name
          * associated with the field for that operator.
          */
-        public Map<OperatorKey, String> canonicalMap = null;
+        private Map<String, LogicalOperator> canonicalMap = null;
+
+        /**
+         * A reverse lookup of canonical names to logical operators. The reverse
+         * lookup serves cases where the canonical name of the predecessor
+         * cannot be determined. In such cases the keys of the reverse lookup
+         * can be used to navigate the plan
+         */
+        private MultiMap<LogicalOperator, String> reverseCanonicalMap = null;
+        
+        /**
+         * Canonical namer object to generate new canonical names on
+         * request. In order to ensure unique and consistent names, across
+         * all field schema objects, the object is made static.
+         */
+        public static CanonicalNamer canonicalNamer = new CanonicalNamer();
         
         private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
 
@@ -99,6 +130,9 @@
             alias = a;
             type = t;
             schema = null;            
+            canonicalName = canonicalNamer.getNewName();
+            canonicalMap = new HashMap<String, LogicalOperator>();
+            reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
         }
 
         /**
@@ -113,6 +147,9 @@
             alias = a;
             type = DataType.TUPLE;
             schema = s;
+            canonicalName = canonicalNamer.getNewName();
+            canonicalMap = new HashMap<String, LogicalOperator>();
+            reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
         }
 
         /**
@@ -131,11 +168,14 @@
             alias = a;
             schema = s;
             log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
-            if ((null != s) && (t != DataType.BAG) && (t != DataType.TUPLE)) {
+            if ((null != s) && !(DataType.isSchemaType(t))) {
                 throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
                         + DataType.findTypeName(t));
             }
             type = t;
+            canonicalName = canonicalNamer.getNewName();
+            canonicalMap = new HashMap<String, LogicalOperator>();
+            reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
         }
 
         /**
@@ -159,9 +199,27 @@
                 schema = null;
                 type = DataType.UNKNOWN;
             }
+            canonicalName = canonicalNamer.getNewName();
+            canonicalMap = new HashMap<String, LogicalOperator>();
+            reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
         }
 
-        /***
+        public void setParent(String parentCanonicalName, LogicalOperator parent) {
+            if(null != parentCanonicalName) {
+                canonicalMap.put(parentCanonicalName, parent);
+            }
+            reverseCanonicalMap.put(parent, parentCanonicalName);
+        }
+
+        public Map<String, LogicalOperator> getCanonicalMap() {
+            return canonicalMap;
+        }
+
+        public MultiMap<LogicalOperator, String> getReverseCanonicalMap() {
+            return reverseCanonicalMap;
+        }
+
+        /**
          *  Two field schemas are equal if types and schemas
          *  are equal in all levels.
          *
@@ -350,7 +408,7 @@
                 fs.canonicalName = canonicalName;
                 if (canonicalMap != null) {
                     fs.canonicalMap =
-                        new HashMap<OperatorKey, String>(canonicalMap);
+                        new HashMap<String, LogicalOperator>(canonicalMap);
                 }
                 return fs;
             } catch (FrontendException fe) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=701235&r1=701234&r2=701235&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Oct  2 13:36:49 2008
@@ -25,9 +25,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.HashSet;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.Algebraic;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -44,6 +46,9 @@
 import org.apache.pig.impl.plan.*;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.data.DataType ;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -359,7 +364,7 @@
         }
     }
 
-    private void insertCastForRegexp(LORegexp rg) {
+    private void insertCastForRegexp(LORegexp rg) throws VisitorException {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
         collectCastWarning(rg, DataType.BYTEARRAY, DataType.CHARARRAY) ;
         OperatorKey newKey = genNewOperatorKey(rg) ;
@@ -376,6 +381,7 @@
             throw err ;
         }
         rg.setOperand(cast) ;
+        this.visit(cast);
     }
 
     public void visit(LOAnd binOp) throws VisitorException {
@@ -1055,7 +1061,7 @@
     }
 
     private void insertLeftCastForBinaryOp(BinaryExpressionOperator binOp,
-                                           byte toType ) {
+                                           byte toType ) throws VisitorException {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
         collectCastWarning(binOp,
                            binOp.getLhsOperand().getType(),
@@ -1075,10 +1081,11 @@
             throw err ;
         }
         binOp.setLhsOperand(cast) ;
+        this.visit(cast);
     }
 
     private void insertRightCastForBinaryOp(BinaryExpressionOperator binOp,
-                                            byte toType ) {
+                                            byte toType ) throws VisitorException {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
         collectCastWarning(binOp,
                            binOp.getRhsOperand().getType(),
@@ -1098,6 +1105,7 @@
             throw err ;
         }
         binOp.setRhsOperand(cast) ;
+        this.visit(cast);
     }
 
     /**
@@ -1139,7 +1147,7 @@
 
     }
 
-    private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) {
+    private void insertCastForUniOp(UnaryExpressionOperator uniOp, byte toType) throws VisitorException {
         collectCastWarning(uniOp,
                            uniOp.getOperand().getType(),
                            toType) ;
@@ -1164,6 +1172,8 @@
             throw err ;
         }
 
+        this.visit(cast);
+
     }
     
     // Currently there is no input type information support in UserFunc
@@ -1507,7 +1517,7 @@
 
     }
 
-    private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) {
+    private void insertLeftCastForBinCond(LOBinCond binCond, byte toType) throws VisitorException {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
 
         collectCastWarning(binCond,
@@ -1528,10 +1538,11 @@
             throw err ;
         } 
         binCond.setLhsOp(cast) ;
+        this.visit(cast);
 
     }
 
-    private void insertRightCastForBinCond(LOBinCond binCond, byte toType) {
+    private void insertRightCastForBinCond(LOBinCond binCond, byte toType) throws VisitorException {
         LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
 
         collectCastWarning(binCond,
@@ -1552,6 +1563,7 @@
             throw err ;
         }               
         binCond.setRhsOp(cast) ;
+        this.visit(cast);
 
     }
 
@@ -1600,6 +1612,17 @@
         // cast.getType() already returns the correct type so don't have to 
         // set here. This is a special case where output type is not
         // automatically determined.
+        
+        if(inputType == DataType.BYTEARRAY) {
+            try {
+                LoadFunc loadFunc = getLoadFunc(cast.getExpression());
+                cast.setLoadFunc(loadFunc);
+            } catch (FrontendException fee) {
+                throw new VisitorException("Cannot resolve load function to use for casting from " + 
+                            DataType.findTypeName(inputType) + " to " +
+                            DataType.findTypeName(expectedType) + ". " + fee.getMessage());
+            }
+        }
     }
     
     
@@ -1751,7 +1774,6 @@
         LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
         List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
 
-        // LOSplitOutput can only have 1 input
         try {
             // Compute the schema
             op.getSchema() ;
@@ -1766,6 +1788,22 @@
         }
     }
 
+    @Override
+    protected void visit(LOLimit op) throws VisitorException {
+        try {
+            // Compute the schema
+            op.regenerateSchema() ;
+        }
+        catch (FrontendException fe) {
+            String msg = "Problem while reading"
+                         + " schemas from inputs of LODistinct" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+    }
+
     /***
      * Return concatenated of all fields from all input operators
      * If one of the inputs have no schema then we cannot construct
@@ -1853,6 +1891,19 @@
 
         checkInnerPlan(comparisonPlan) ;
               
+
+        /*
+        try {
+            System.err.println("Filter inner plan typechecked");
+            LOPrinter lv = new LOPrinter(System.err, comparisonPlan);
+            lv.visit();
+            System.err.println();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+        }
+        */
+        
         byte innerCondType = comparisonPlan.getLeaves().get(0).getType() ;
         if (innerCondType != DataType.BOOLEAN) {
             String msg = "Filter's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType);
@@ -1890,7 +1941,7 @@
         
         try {
             // Compute the schema
-            split.getSchema() ;
+            split.regenerateSchema() ;
         }
         catch (FrontendException ioe) {
             String msg = "Problem while reconciling output schema of LOSplit" ;
@@ -2040,7 +2091,7 @@
     // as a new leave of the plan
     private void insertAtomicCastForCOGroupInnerPlan(LogicalPlan innerPlan,
                                                      LOCogroup cg,
-                                                     byte toType) {
+                                                     byte toType) throws VisitorException {
         if(!DataType.isUsableType(toType)) {
             throw new AssertionError("Cannot cast to type " + DataType.findTypeName(toType));
         }
@@ -2063,6 +2114,7 @@
             err.initCause(ioe) ;
             throw err ;
         }
+        this.visit(cast);
     }
 
     /**
@@ -2508,4 +2560,155 @@
         return new OperatorKey(scope, newId) ;
     }
 
+    private LoadFunc getLoadFunc(ExpressionOperator exOp) throws FrontendException {
+        Schema.FieldSchema fs = ((ExpressionOperator)exOp).getFieldSchema();
+        if(null == fs) {
+            return null;
+        }
+
+        Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+        MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+        MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+        
+        if(canonicalMap.keySet().size() > 0) {
+            for(String parentCanonicalName: canonicalMap.keySet()) {
+                LoadFunc lf = getLoadFunc(exOp, parentCanonicalName);
+                if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+            }
+        } else {
+            for(LogicalOperator op: reverseCanonicalMap.keySet()) {
+                for(String parentCanonicalName: reverseCanonicalMap.get(op)) {
+                    LoadFunc lf = getLoadFunc(op, parentCanonicalName);
+                    if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                }
+            }
+        }
+        if(loadFuncMap.keySet().size() == 0) {
+            return null;
+        }
+        if(loadFuncMap.keySet().size() == 1) {
+            String lfString = loadFuncMap.keySet().iterator().next();
+            return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+        }
+
+        throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+    }
+
+    private LoadFunc getLoadFunc(LogicalOperator op, String parentCanonicalName) throws FrontendException {
+        MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+        if(op instanceof ExpressionOperator) {
+            if(op instanceof LOUserFunc) {
+                throw new FrontendException("Found a user defined function. Cannot determine the load function to use");
+            }
+            
+            Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
+            Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+            MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+            
+            if(canonicalMap.keySet().size() > 0) {
+                for(String canonicalName: canonicalMap.keySet()) {
+                    LoadFunc lf = getLoadFunc(fs, canonicalName);
+                    if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                }
+            } else {
+                for(LogicalOperator lop: reverseCanonicalMap.keySet()) {
+                    for(String canonicalName: reverseCanonicalMap.get(lop)) {
+                        LoadFunc lf = getLoadFunc(fs, canonicalName);
+                        if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                    }
+                }
+            }
+        } else {
+            if(op instanceof LOLoad) {
+                return ((LOLoad)op).getLoadFunc();
+            } else if (op instanceof LOStream) {
+                StreamingCommand command = ((LOStream)op).getStreamingCommand();
+                HandleSpec streamOutputSpec = command.getOutputSpec(); 
+                LoadFunc streamLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+                return streamLoader;
+            } else if ((op instanceof LOFilter)
+                    || (op instanceof LODistinct)
+                    || (op instanceof LOSort)
+                    || (op instanceof LOSplit)
+                    || (op instanceof LOSplitOutput)
+                    || (op instanceof LOLimit)) {
+                LogicalPlan lp = op.getPlan();
+                LoadFunc lf = getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);
+                return lf;
+                //return getLoadFunc(lp.getPredecessors(op).get(0), parentCanonicalName);        
+            }
+            
+            Schema s = op.getSchema();
+            if(null != s) {
+                for(Schema.FieldSchema fs: s.getFields()) {
+                    if(null != parentCanonicalName && (parentCanonicalName.equals(fs.canonicalName))) {
+                        if(fs.getCanonicalMap().keySet().size() > 0) {
+                            for(String canonicalName: fs.getCanonicalMap().keySet()) {
+                                LoadFunc lf = getLoadFunc(fs, canonicalName);
+                                if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                            }
+                        } else {
+                            LoadFunc lf = getLoadFunc(fs, null);
+                            if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                        }
+                    } else if (null == parentCanonicalName) {
+                        LoadFunc lf = getLoadFunc(fs, null);
+                        if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                    }
+                }
+            } else {
+                LogicalPlan lp = op.getPlan();
+                for(LogicalOperator pred: lp.getPredecessors(op)) {
+                    LoadFunc lf = getLoadFunc(pred, parentCanonicalName);
+                    if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                }
+            }
+        }
+        if(loadFuncMap.keySet().size() == 0) {
+            return null;
+        }
+        if(loadFuncMap.keySet().size() == 1) {
+            String lfString = loadFuncMap.keySet().iterator().next();
+            return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+        }
+    
+        throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+    }
+
+    private LoadFunc getLoadFunc(Schema.FieldSchema fs, String parentCanonicalName) throws FrontendException {
+        if(null == fs) {
+            return null;
+        }
+        Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
+        MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
+        MultiMap<String, LoadFunc> loadFuncMap = new MultiMap<String, LoadFunc>();
+
+        if(canonicalMap.keySet().size() > 0) {
+            for(String canonicalName: canonicalMap.keySet()) {
+                if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
+                    LoadFunc lf = getLoadFunc(canonicalMap.get(canonicalName), parentCanonicalName);
+                    if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                }
+            }
+        } else {
+            for(LogicalOperator op: reverseCanonicalMap.keySet()) {
+                for(String canonicalName: reverseCanonicalMap.get(op)) {
+                    if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
+                        LoadFunc lf = getLoadFunc(op, parentCanonicalName);
+                        if(null != lf) loadFuncMap.put(lf.getClass().getName(), lf);
+                    }
+                }
+            }
+        }
+        if(loadFuncMap.keySet().size() == 0) {
+            return null;
+        }
+        if(loadFuncMap.keySet().size() == 1) {
+            String lfString = loadFuncMap.keySet().iterator().next();
+            return (LoadFunc)(loadFuncMap.get(lfString).iterator().next());
+        }
+
+        throw new FrontendException("Found more than one load function to use: " + loadFuncMap.keySet());
+    }
+
 }