You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/30 19:15:55 UTC

svn commit: r990868 - in /hadoop/pig/trunk: ./ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logicalLayer/validators/ test/org/apache/pig/test/

Author: thejas
Date: Mon Aug 30 17:15:54 2010
New Revision: 990868

URL: http://svn.apache.org/viewvc?rev=990868&view=rev
Log:
PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via thejas)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 17:15:54 2010
@@ -171,6 +171,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1482: Pig gets confused when more than one loader is involved (xuefuz via thejas)
+
 PIG-1579: Intermittent unit test failure for TestScriptUDF.testPythonScriptUDFNullInputOutput (daijy)
 
 PIG-1557: couple of issue mapping aliases to jobs (rding)

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Mon Aug 30 17:15:54 2010
@@ -69,20 +69,16 @@ public class LODistinct extends Relation
                     throw new FrontendException(msg, errCode, PigException.BUG, false, null);
                 }
                 if(op instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
+                    Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op);
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {
                         fss.add(fs);
                         mSchema = new Schema(fss);
-                        for (int i=0;i<getInput().getSchema().size();i++)
-                            mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
                     }
                 } else {
                     if (op.getSchema()!=null) {
-                        mSchema = new Schema(op.getSchema());
-                        for (int i=0;i<op.getSchema().size();i++)
-                            mSchema.getField(i).setParent(op.getSchema().getField(i).canonicalName, op);
+                        mSchema = Schema.copyAndLink(op.getSchema(), op);
                     }
                     else
                         mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Mon Aug 30 17:15:54 2010
@@ -75,7 +75,7 @@ public class LOFilter extends Relational
             ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
             try {
                 if(input instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)input).getFieldSchema());
+                    Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)input).getFieldSchema(), input);
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {
@@ -84,9 +84,7 @@ public class LOFilter extends Relational
                     }
                 } else {
                     if (getInput().getSchema()!=null) {
-                        mSchema = new Schema(input.getSchema());
-                        for (int i=0;i<getInput().getSchema().size();i++)
-                            mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+                        mSchema = Schema.copyAndLink( input.getSchema(), input );
                     }
                     else
                         mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon Aug 30 17:15:54 2010
@@ -242,8 +242,7 @@ public class LOForEach extends Relationa
 						    if(null != s && s.size()!=0) {
 						        for(int i = 0; i < s.size(); ++i) {
                                     Schema.FieldSchema fs;
-                                    fs = new Schema.FieldSchema(s.getField(i));
-                                    fs.setParent(s.getField(i).canonicalName, op);
+                                    fs = Schema.FieldSchema.copyAndLink(s.getField(i), op);
 									log.debug("fs: " + fs);
                                     if(null != userDefinedSchema) {
                                         Schema.FieldSchema userDefinedFieldSchema;
@@ -301,7 +300,7 @@ public class LOForEach extends Relationa
                                         updateAliasCount(aliases, newFs.alias);
                                         fss.add(newFs);
                                         mSchemaPlanMapping.add(plan);
-                                        newFs.setParent(null, op);
+                                        newFs.setParent(planFs.canonicalName, op);
                                     } else {
                                         for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
                                             Schema.FieldSchema.setFieldSchemaDefaultType(ufs, DataType.BYTEARRAY);
@@ -320,14 +319,14 @@ public class LOForEach extends Relationa
                                     }
                                     fss.add(newFs);
                                     mSchemaPlanMapping.add(plan);
-                                    newFs.setParent(null, op);
+                                    newFs.setParent( planFs.canonicalName, op );
                                 }
 							}
 						} else {
 							//just populate the schema with the field schema of the expression operator
                             //check if the user has defined a schema for the operator; compare the schema
                             //with that of the expression operator field schema and then add it to the list
-                            Schema.FieldSchema newFs = new Schema.FieldSchema(planFs);
+                            Schema.FieldSchema newFs = Schema.FieldSchema.copyAndLink(planFs, op);
                             if(null != userDefinedSchema) {
                                 try {
                                     newFs = newFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Mon Aug 30 17:15:54 2010
@@ -134,7 +134,7 @@ public class LOGenerate extends LogicalO
                     throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
                 }
                 if(op instanceof ExpressionOperator) {
-                    fss.add(new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema()));
+                    fss.add( Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op));
                     mSchema = new Schema(fss);
                 } else {
                     mSchema = op.getSchema();

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Mon Aug 30 17:15:54 2010
@@ -64,9 +64,7 @@ public class LOLimit extends RelationalO
         if (!mIsSchemaComputed) {
             try {
                 if (getInput().getSchema()!=null) {
-                    mSchema = new Schema(getInput().getSchema());
-                    for (int i=0;i<getInput().getSchema().size();i++)
-                        mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+                    mSchema = Schema.copyAndLink(getInput().getSchema(), getInput());
                 }
                 else
                     mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Mon Aug 30 17:15:54 2010
@@ -173,6 +173,8 @@ public class LOLoad extends RelationalOp
                 mSchema = null;
                 throw fee;
             }
+            // Set the parent of all fields in the schema as this (LOLoad instance) with parent canonicalName as null.
+            setParent( mSchema );
         }
         return mSchema;
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java Mon Aug 30 17:15:54 2010
@@ -31,6 +31,7 @@ import org.apache.pig.impl.plan.PlanVisi
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -203,8 +204,7 @@ public class LOProject extends Expressio
                     if(!mSentinel) {
                         //we have an expression operator and hence a list of field shcemas
                         Schema.FieldSchema fs = ((ExpressionOperator)expressionOperator).getFieldSchema();
-                         mFieldSchema = new Schema.FieldSchema(fs);
-                         mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+                         mFieldSchema = Schema.FieldSchema.copyAndLink(fs, expressionOperator);
                     } else {
                         //we have a relational operator as input and hence a schema
                         log.debug("expression operator alias: " + expressionOperator.getAlias());
@@ -271,8 +271,7 @@ public class LOProject extends Expressio
                                             // normal single level access
                                             fs = s.getField(mProjection.get(0));
                                         }
-                                        mFieldSchema = new Schema.FieldSchema(fs);
-                                        mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+                                        mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
                                     } else {
                                         mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                         mFieldSchema.setParent(expOpFs.canonicalName, expressionOperator);
@@ -287,8 +286,7 @@ public class LOProject extends Expressio
                                 log.debug("s: " + s);
                                 if(null != s) {
                                     Schema.FieldSchema fs = s.getField(mProjection.get(0));
-                                    mFieldSchema = new Schema.FieldSchema(fs);
-                                    mFieldSchema.setParent(fs.canonicalName, expressionOperator);
+                                    mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
                                     log.debug("mFieldSchema alias: " + mFieldSchema.alias);
                                     log.debug("mFieldSchema schema: " + mFieldSchema.schema);
                                 } else {
@@ -312,9 +310,8 @@ public class LOProject extends Expressio
                                     if(null != s) {
                                         if(colNum < s.size()) {
                                             Schema.FieldSchema parentFs = s.getField(colNum);
-                                            fs = new Schema.FieldSchema(parentFs);
+                                            fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator );
                                             fss.add(fs);
-                                            fs.setParent(parentFs.canonicalName, expressionOperator);
                                         } else {
                                             fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                             fss.add(fs);
@@ -334,9 +331,8 @@ public class LOProject extends Expressio
                                 Schema s = expressionOperator.getSchema();
                                 if(null != s) {
                                     Schema.FieldSchema parentFs = s.getField(colNum);
-                                    fs = new Schema.FieldSchema(parentFs);
+                                    fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator);
                                     fss.add(fs);
-                                    fs.setParent(parentFs.canonicalName, expressionOperator);
                                 } else {
                                     fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
                                     fss.add(fs);
@@ -351,7 +347,8 @@ public class LOProject extends Expressio
                 //    throw new FrontendException(pe.getMessage());
                 //}
                 mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
-                mFieldSchema.setParent(null, expressionOperator);
+                Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+                mFieldSchema.setParent( expOpFs.canonicalName, expressionOperator );
                 mIsFieldSchemaComputed = true;
             }
 
@@ -381,7 +378,7 @@ public class LOProject extends Expressio
                 if(!DataType.isSchemaType(mType)) {
                     Schema pjSchema = new Schema(mFieldSchema);
                     mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
-                    mFieldSchema.setParent(null, expressionOperator);
+                    mFieldSchema.setParent( ((LOProject)expressionOperator).mFieldSchema.canonicalName, expressionOperator );
                 } else {
                     if(null != mFieldSchema) {
                         mFieldSchema.type = DataType.BAG;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Mon Aug 30 17:15:54 2010
@@ -147,7 +147,7 @@ public class LOSort extends RelationalOp
                     throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
                 }
                 if(op instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = new Schema.FieldSchema(((ExpressionOperator)op).getFieldSchema());
+                    Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op);
                     if(DataType.isSchemaType(fs.type)) {
                         mSchema = fs.schema;
                     } else {
@@ -156,9 +156,7 @@ public class LOSort extends RelationalOp
                     }
                 } else {
                     if (getInput().getSchema()!=null) {
-                        mSchema = new Schema(op.getSchema());
-                        for (int i=0;i<getInput().getSchema().size();i++)
-                            mSchema.getField(i).setParent(getInput().getSchema().getField(i).canonicalName, getInput());
+                        mSchema = Schema.copyAndLink( op.getSchema(), op );
                     }
                     else
                         mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java Mon Aug 30 17:15:54 2010
@@ -88,9 +88,7 @@ public class LOSplit extends RelationalO
                 }
                 LogicalOperator input = s.iterator().next();
                 if (input.getSchema()!=null) {
-                    mSchema = new Schema(input.getSchema());
-                    for (int i=0;i<input.getSchema().size();i++)
-                        mSchema.getField(i).setParent(input.getSchema().getField(i).canonicalName, input);
+                    mSchema = Schema.copyAndLink(input.getSchema(), input);
                 }
                 else
                     mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Mon Aug 30 17:15:54 2010
@@ -90,9 +90,7 @@ public class LOSplitOutput extends Relat
                     throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
                 }
                 if (input.getSchema()!=null) {
-                    mSchema = new Schema(input.getSchema());
-                    for (int i=0;i<input.getSchema().size();i++)
-                        mSchema.getField(i).setParent(input.getSchema().getField(i).canonicalName, input);
+                    mSchema = Schema.copyAndLink(input.getSchema(), input);
                 }
                 else
                     mSchema = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java Mon Aug 30 17:15:54 2010
@@ -56,6 +56,9 @@ public class LOStream extends Relational
     // Stream Operator this operator represents
     private StreamingCommand command;
     transient private ExecutableManager executableManager;
+    
+    private boolean isParentSet = false;
+    
     /**
      * Create a new <code>LOStream</code> with the given command.
      * 
@@ -87,24 +90,15 @@ public class LOStream extends Relational
      */
     @Override
     public Schema getSchema() throws FrontendException {
-        return mSchema;
-        /*
-        if (!mIsSchemaComputed) {
-            /*
-            LogicalOperator input = mPlan.getPredecessors(this).get(0);
-            ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
-            try {
-                mSchema = input.getSchema();
-                mIsSchemaComputed = true;
-            } catch (FrontendException ioe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw ioe;
-            }
+    	if( mSchema == null )
+    		return null;
+    	
+        if( !isParentSet ) {
+        	setParent( mSchema );
+        	isParentSet = true;
         }
+        
         return mSchema;
-        */
-
     }
     
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Mon Aug 30 17:15:54 2010
@@ -162,6 +162,22 @@ abstract public class LogicalOperator ex
             mSchema.reconcile(schema);
         }
     }
+    
+    /**
+     * Set the parent of the schema field in the schema hierarchy. Currently only used by
+     * LOStream and LOLoad.
+     * 
+     * @param schema the schema instance to set parent for
+     */
+    protected final void setParent(Schema schema) {
+    	if( schema == null )
+    		return;
+    	
+    	for( Schema.FieldSchema fs : schema.getFields() ) {
+    		fs.setParent( null, this );
+   			setParent( fs.schema );
+    	}
+    }
 
     /**
      * Directly force the schema without reconcilation

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Mon Aug 30 17:15:54 2010
@@ -177,11 +177,13 @@ public class Schema implements Serializa
             alias = a;
             schema = s;
             log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
+            
             if ((null != s) && !(DataType.isSchemaType(t))) {
                 int errCode = 1020;
                 throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
                         + DataType.findTypeName(t), errCode, PigException.INPUT);
             }
+            
             type = t;
             canonicalName = canonicalNamer.getNewName();
             canonicalMap = new HashMap<String, LogicalOperator>();
@@ -214,6 +216,35 @@ public class Schema implements Serializa
             reverseCanonicalMap = new MultiMap<LogicalOperator, String>();
         }
 
+        /**
+         * Make a copy of the FieldSchema instance and link the new one to the old one with canonical map.
+         * 
+         * @param fs FieldSchema instance to be copied.
+         * @param op The operator to which the old FieldSchema instance belongs.
+         * @return a new copy
+         * @throws FrontendException
+         */
+        public static FieldSchema copyAndLink(FieldSchema fs, LogicalOperator op) {
+        	String alias = null;
+        	Schema schema = null;
+        	byte type = DataType.UNKNOWN;
+            if( null != fs ) {
+                alias = fs.alias;
+                if( null != fs.schema ) {
+                    schema = Schema.copyAndLink( fs.schema, op );
+                } else {
+                    schema = null;
+                }
+                type = fs.type;
+            }
+            
+            FieldSchema fieldSchema = new FieldSchema( alias, schema );
+            fieldSchema.type = type;
+            fieldSchema.setParent( fs == null ? null : fs.canonicalName, op );
+            
+            return fieldSchema;
+        }
+        
         public void setParent(String parentCanonicalName, LogicalOperator parent) {
             if(null != parentCanonicalName) {
                 canonicalMap.put(parentCanonicalName, parent);
@@ -569,6 +600,22 @@ public class Schema implements Serializa
             return (fs.type == DataType.NULL || fs.type == DataType.UNKNOWN);
         }
 
+        /**
+         * Find a field schema instance in this FieldSchema hierarchy (including "this")
+         * that matches the given canonical name.
+         * 
+         * @param canonicalName canonical name
+         * @return the FieldSchema instance found
+         */
+		public FieldSchema findFieldSchema(String canonicalName) {
+	        if( this.canonicalName.equals(canonicalName) ) {
+	        	return this;
+	        }
+	        if( this.schema != null )
+	        	return schema.findFieldSchema( canonicalName );
+	        return null;
+        }
+
     }
 
     private List<FieldSchema> mFields;
@@ -673,6 +720,42 @@ public class Schema implements Serializa
     }
 
     /**
+     * Make a copy of the given schema object and link the original with the copy using 
+     * canonical name map.
+     * 
+     * @param s The original schema
+     * @param op The operator to which the original belongs
+     * @return a new copy
+     */
+    public static Schema copyAndLink(Schema s, LogicalOperator op) {
+    	Schema result = new Schema();
+        if(null != s) {
+            result.twoLevelAccessRequired = s.twoLevelAccessRequired;
+            try {
+                for( int i = 0; i < s.size(); ++i ) {
+                    FieldSchema fs = FieldSchema.copyAndLink( s.getField(i), op );
+                    result.mFields.add(fs);
+                    if(null != fs) {
+                        if (fs.alias != null) {
+                            result.mAliases.put(fs.alias, fs);
+                            result.mFieldSchemas.put(fs.canonicalName, fs.alias);
+                        }
+                    }
+                }
+            } catch (FrontendException pe) {
+            	result.mFields = new ArrayList<FieldSchema>();
+            	result.mAliases = new HashMap<String, FieldSchema>();
+            	result.mFieldSchemas = new MultiMap<String, String>();
+            }
+        } else {
+        	result.mFields = new ArrayList<FieldSchema>();
+        	result.mAliases = new HashMap<String, FieldSchema>();
+        	result.mFieldSchemas = new MultiMap<String, String>();
+        }
+	    return result;
+    }
+
+    /**
      * Given an alias name, find the associated FieldSchema.
      * @param alias Alias to look up.
      * @return FieldSchema, or null if no such alias is in this tuple.
@@ -1808,6 +1891,24 @@ public class Schema implements Serializa
         }
         return new Schema(fsList);
     }
+
+    /**
+     * Look for a FieldSchema instance in the schema hierarchy which has the given canonical name.
+     * @param canonicalName canonical name
+     * @return the FieldSchema instance found
+     */
+	public FieldSchema findFieldSchema(String canonicalName) {
+	    for( FieldSchema fs : mFields ) {
+	    	if( fs.canonicalName.equals( canonicalName ) )
+	    		return fs;
+	    	if( fs.schema != null ) {
+	    		FieldSchema result = fs.schema.findFieldSchema( canonicalName );
+	    		if( result != null )
+	    			return result;
+	    	}
+	    }
+	    return null;
+    }
     
 }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Mon Aug 30 17:15:54 2010
@@ -1015,6 +1015,18 @@ public class TypeCheckingVisitor extends
         this.visit(cast);
     }
 
+    /**
+     * The cast insertion for UDF is slight different in that we need to link the SchemaField
+     * in the cast with its parent. This is because we don't call its getSchemafield() when 
+     * looking for loadfuncSpec. See getLoadFuncSpec(LogicalOperator op, String parentCanonicalName)
+     * for more information.
+     */
+    private void insertCastForUDF(LOUserFunc udf,
+    		FieldSchema fromFS, FieldSchema toFs, ExpressionOperator predecessor) 
+    throws VisitorException {
+        toFs.setParent( fromFS.canonicalName, predecessor );
+        insertCast( udf, fromFS.type, toFs, predecessor );
+    }
     
     
     /**
@@ -1593,7 +1605,7 @@ public class TypeCheckingVisitor extends
             if(fFSch.type==tFSch.type) {
                 continue;
             }
-            insertCast(udf, tFSch.type, tFSch, args.get(i));
+            insertCastForUDF(udf, fFSch, tFSch, args.get(i));
         }
     }
 
@@ -1764,8 +1776,11 @@ public class TypeCheckingVisitor extends
         
         if(inputType == DataType.BYTEARRAY) {
             try {
-                FuncSpec loadFuncSpec = getLoadFuncSpec(cast.getExpression());
-                cast.setLoadFuncSpec(loadFuncSpec);
+            	Map<String, LogicalOperator> canonicalMap = cast.getFieldSchema().getCanonicalMap();
+            	for( Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) {
+                    FuncSpec loadFuncSpec = getLoadFuncSpec( entry.getValue(), entry.getKey() );
+                    cast.setLoadFuncSpec( loadFuncSpec );
+            	}
             } catch (FrontendException fee) {
                 int errCode = 1053;
                 String msg = "Cannot resolve load function to use for casting from " + 
@@ -3049,44 +3064,6 @@ public class TypeCheckingVisitor extends
         return new OperatorKey(scope, newId) ;
     }
 
-    private FuncSpec getLoadFuncSpec(ExpressionOperator exOp) throws FrontendException {
-        Schema.FieldSchema fs = exOp.getFieldSchema();
-        if(null == fs) {
-            return null;
-        }
-
-        Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
-        MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
-        MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
-        
-        if(canonicalMap.keySet().size() > 0) {
-            for(String parentCanonicalName: canonicalMap.keySet()) {
-                FuncSpec lfSpec = getLoadFuncSpec(exOp, parentCanonicalName);
-                if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-            }
-        } else {
-            for(LogicalOperator op: reverseCanonicalMap.keySet()) {
-                for(String parentCanonicalName: reverseCanonicalMap.get(op)) {
-                    FuncSpec lfSpec = getLoadFuncSpec(op, parentCanonicalName);
-                    if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                }
-            }
-        }
-        if(loadFuncSpecMap.keySet().size() == 0) {
-            return null;
-        }
-        if(loadFuncSpecMap.keySet().size() == 1) {
-            String lfString = loadFuncSpecMap.keySet().iterator().next();
-            return loadFuncSpecMap.get(lfString).iterator().next();
-        }
-
-        {
-            int errCode = 1065;
-            String msg = "Found more than one load function to use: " + loadFuncSpecMap.keySet();
-            throw new FrontendException(msg, errCode, PigException.INPUT);
-        }
-    }
-
     private FuncSpec getLoadFuncSpec(LogicalOperator op, String parentCanonicalName) throws FrontendException {
         MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
         if(op instanceof ExpressionOperator) {
@@ -3095,22 +3072,10 @@ public class TypeCheckingVisitor extends
             }
             
             Schema.FieldSchema fs = ((ExpressionOperator)op).getFieldSchema();
-            Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
-            MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
-            
-            if(canonicalMap.keySet().size() > 0) {
-                for(String canonicalName: canonicalMap.keySet()) {
-                    FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
-                    if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                }
-            } else {
-                for(LogicalOperator lop: reverseCanonicalMap.keySet()) {
-                    for(String canonicalName: reverseCanonicalMap.get(lop)) {
-                        FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
-                        if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                    }
-                }
+            if( parentCanonicalName != null ) {
+            	fs = fs.findFieldSchema( parentCanonicalName );
             }
+            getLoadFuncSpec( fs, loadFuncSpecMap );
         } else {
             if(op instanceof LOLoad) {
                 return ((LOLoad)op).getInputFile().getFuncSpec();
@@ -3123,22 +3088,8 @@ public class TypeCheckingVisitor extends
             
             Schema s = op.getSchema();
             if(null != s) {
-                for(Schema.FieldSchema fs: s.getFields()) {
-                    if(null != parentCanonicalName && (parentCanonicalName.equals(fs.canonicalName))) {
-                        if(fs.getCanonicalMap().keySet().size() > 0) {
-                            for(String canonicalName: fs.getCanonicalMap().keySet()) {
-                                FuncSpec lfSpec = getLoadFuncSpec(fs, canonicalName);
-                                if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                            }
-                        } else {
-                            FuncSpec lfSpec = getLoadFuncSpec(fs, null);
-                            if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                        }
-                    } else if (null == parentCanonicalName) {
-                        FuncSpec lfSpec = getLoadFuncSpec(fs, null);
-                        if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                    }
-                }
+            	FieldSchema fieldSchema = s.findFieldSchema( parentCanonicalName );
+                getLoadFuncSpec( fieldSchema, loadFuncSpecMap );
             } else {
                 LogicalPlan lp = op.getPlan();
                 for(LogicalOperator pred: lp.getPredecessors(op)) {
@@ -3161,44 +3112,29 @@ public class TypeCheckingVisitor extends
             throw new FrontendException(msg, errCode, PigException.INPUT);
         }
     }
-
-    private FuncSpec getLoadFuncSpec(Schema.FieldSchema fs, String parentCanonicalName) throws FrontendException {
-        if(null == fs) {
-            return null;
-        }
-        Map<String, LogicalOperator> canonicalMap = fs.getCanonicalMap();
-        MultiMap<LogicalOperator, String> reverseCanonicalMap = fs.getReverseCanonicalMap();
-        MultiMap<String, FuncSpec> loadFuncSpecMap = new MultiMap<String, FuncSpec>();
-
-        if(canonicalMap.keySet().size() > 0) {
-            for(Map.Entry<String, LogicalOperator> e: canonicalMap.entrySet()) {
-                if((null == parentCanonicalName) || (parentCanonicalName.equals(e.getKey()))) {
-                    FuncSpec lfSpec = getLoadFuncSpec(e.getValue(), parentCanonicalName);
-                    if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                }
-            }
+    
+    /**
+     * Results are stored in the input param, loadFuncSpecMap
+     */
+    private void getLoadFuncSpec(FieldSchema fieldSchema, MultiMap<String, FuncSpec> loadFuncSpecMap) 
+    throws FrontendException {
+    	if( fieldSchema == null )
+    		return;
+    	
+        Map<String, LogicalOperator> canonicalMap = fieldSchema.getCanonicalMap();
+        if( canonicalMap.size() > 0 ) {
+        	for(Map.Entry<String, LogicalOperator> entry : canonicalMap.entrySet() ) {
+                FuncSpec lfSpec = getLoadFuncSpec( entry.getValue(), entry.getKey() );
+                if( null != lfSpec ) 
+                	loadFuncSpecMap.put( lfSpec.getClassName(), lfSpec );
+        	}
         } else {
-            for(LogicalOperator op: reverseCanonicalMap.keySet()) {
-                for(String canonicalName: reverseCanonicalMap.get(op)) {
-                    if((null == parentCanonicalName) || (parentCanonicalName.equals(canonicalName))) {
-                        FuncSpec lfSpec = getLoadFuncSpec(op, parentCanonicalName);
-                        if(null != lfSpec) loadFuncSpecMap.put(lfSpec.getClassName(), lfSpec);
-                    }
-                }
-            }
-        }
-        if(loadFuncSpecMap.keySet().size() == 0) {
-            return null;
-        }
-        if(loadFuncSpecMap.keySet().size() == 1) {
-            String lfString = loadFuncSpecMap.keySet().iterator().next();
-            return loadFuncSpecMap.get(lfString).iterator().next();
-        }
-
-        {
-            int errCode = 1065;
-            String msg = "Found more than one load function to use: " + loadFuncSpecMap.keySet();
-            throw new FrontendException(msg, errCode, PigException.INPUT);
+        	MultiMap<LogicalOperator, String> reverseCanonicalMap = fieldSchema.getReverseCanonicalMap();
+        	for( LogicalOperator lop : reverseCanonicalMap.keySet() ) {
+        		FuncSpec lfSpec = getLoadFuncSpec( lop, null );
+        		if( null != lfSpec )
+        			loadFuncSpecMap.put( lfSpec.getClassName(), lfSpec );
+        	}
         }
     }
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=990868&r1=990867&r2=990868&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Mon Aug 30 17:15:54 2010
@@ -5777,5 +5777,163 @@ public class TestTypeCheckingValidator e
         }
         
     }
+    
+    @Test
+    public void testLineageMultipleLoader1() throws FrontendException {
+        planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w);" ) ;
+        planTester.buildPlan( "B = LOAD 'data2' USING TextLoader() AS (x, y);" ) ;
+        planTester.buildPlan("C = JOIN A BY u, B BY x USING 'replicated';") ;
+        planTester.buildPlan("D = GROUP C BY (u, x);");
+        LogicalPlan plan = planTester.buildPlan( "E = FOREACH D GENERATE (chararray)group.u, (int)group.x;" );
 
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        // Check group.u
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+        LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+        LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+
+        // Check group.x
+        foreachPlan = foreach.getForEachPlans().get( 1 );
+        exOp = foreachPlan.getRoots().get(0);
+        proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+        cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("TextLoader"));
+    }
+
+    /**
+     * From JIRA 1482
+     * @throws FrontendException
+     */
+    @Test
+    public void testLineageMultipleLoader2() throws FrontendException {
+        planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (s, m, l);" ) ;
+        planTester.buildPlan( "B = FOREACH A GENERATE s#'k1' as v1, m#'k2' as v2, l#'k3' as v3;" ) ;
+        planTester.buildPlan( "C = FOREACH B GENERATE v1, (v2 == 'v2' ? 1L : 0L) as v2:long, (v3 == 'v3' ? 1 :0) as v3:int;" ) ;
+        planTester.buildPlan( "D = LOAD 'data2' USING TextLoader() AS (a);");
+        planTester.buildPlan( "E = JOIN C BY v1, D BY a USING 'replicated';" );
+        planTester.buildPlan( "F = GROUP E BY (v1, a);" );
+        LogicalPlan plan = planTester.buildPlan( "G = FOREACH F GENERATE (chararray)group.v1, group.a;" );
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        // Check group.u
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+        LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+        LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+    }
+
+    /**
+     * A special invalid case.
+     */
+    @Test
+    public void testLineageMultipleLoader3() throws FrontendException {
+        planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w);" ) ;
+        planTester.buildPlan( "B = LOAD 'data2' USING TextLoader() AS (x, y);" ) ;
+        planTester.buildPlan("C = COGROUP A BY u, B by x;");
+        LogicalPlan plan = planTester.buildPlan( "D = FOREACH C GENERATE (chararray)group;" );
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        try {
+        	typeValidator.validate(plan, collector) ;
+        } catch(PlanValidationException ex) {
+        	assertTrue( ex.getCause().toString().contains( "Cannot resolve load function to use for casting from bytearray to chararray." ) );
+        	return;
+        }
+        assertTrue( "Validation failure is expected.", false );
+    }
+    
+    /**
+     * In case of filter with tuple type
+     */
+    @Test
+    public void testLineageFilterWithTuple() throws FrontendException {
+        planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u, v, w:tuple(a,b));" ) ;
+        planTester.buildPlan( "B = FOREACH A generate v, w;");
+        planTester.buildPlan( "C = FILTER B by v < 50;" ) ;
+        LogicalPlan plan = planTester.buildPlan("D = FOREACH C generate (int)w.a;");
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+        LogicalOperator exOp = foreachPlan.getRoots().get(0);
+        LOProject proj = (LOProject)foreachPlan.getSuccessors(exOp).get(0);
+        LOCast cast = (LOCast)foreachPlan.getSuccessors( proj ).get( 0 );
+        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+    }
+    
+    @Test
+    public void testLineageExpressionCasting() throws FrontendException {
+        planTester.buildPlan( "A = LOAD 'data1' USING PigStorage() AS (u:int, v);" ) ;
+        planTester.buildPlan( "B = FILTER A by u < 50;" ) ;
+        LogicalPlan plan = planTester.buildPlan("C = FOREACH B generate u + v;");
+
+        // validate
+        CompilationMessageCollector collector = new CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;
+
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+        if (collector.hasError()) {
+            throw new AssertionError("Expect no  error") ;
+        }
+
+        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+        LogicalPlan foreachPlan = foreach.getForEachPlans().get(0);
+        List<LogicalOperator> projs = foreachPlan.getRoots();
+        LogicalOperator proj = projs.get(1);
+        LogicalOperator op = foreachPlan.getSuccessors( proj ).get( 0 );
+        if( !( op instanceof LOCast ) ) {
+        	proj = projs.get(1);
+            op = foreachPlan.getSuccessors( proj ).get( 0 );
+        }
+        LOCast cast = (LOCast)op;
+        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
+    }
+    
 }