You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/14 00:45:17 UTC

svn commit: r656039 - in /incubator/pig/branches/types: src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/schema/ test/org/apache/pig/test/

Author: gates
Date: Tue May 13 15:45:16 2008
New Revision: 656039

URL: http://svn.apache.org/viewvc?rev=656039&view=rev
Log:
PIG-159 Santhosh's changes to modify AS to allow type definition and changes to expression operators to return a FieldSchema rather than a Schema.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOBinCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOConst.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Tue May 13 15:45:16 2008
@@ -339,6 +339,16 @@
         return ret;
     }
 
+    public static Map<String, Byte> genNameToTypeMap(){
+        byte[] types = genAllTypes();
+        String[] names = genAllTypeNames();
+        Map<String, Byte> ret = new HashMap<String, Byte>();
+        for(int i=0;i<types.length;i++){
+            ret.put(names[i], types[i]);
+        }
+        return ret;
+    }
+
     /**
      * Get the type name.
      * @param o Object to test.
@@ -430,6 +440,25 @@
     }
 
     /**
+     * Determine whether the this data type has a schema.
+     * @param o Object to determine if it has a schema
+     * @return true if the type can have a alid schema (i.e., bag or tuple)
+     */
+    public static boolean isSchemaType(Object o) {
+        return isSchemaType(findType(o));
+    }
+
+    /**
+     * Determine whether the this data type has a schema.
+     * @param o Object to determine if it has a schema
+     * @return true if the type can have a alid schema (i.e., bag or tuple)
+     */
+    public static boolean isSchemaType(byte dataType) {
+        return ((dataType == BAG) || (dataType == TUPLE)); 
+    }
+
+    /**
+    /**
      * Compare two objects to each other.  This function is necessary
      * because there's no super class that implements compareTo.  This
      * function provides an (arbitrary) ordering of objects of different

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Tue May 13 15:45:16 2008
@@ -18,7 +18,10 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.List;
+import java.util.ArrayList;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -26,6 +29,8 @@
 
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(ExpressionOperator.class);
+	protected boolean mIsFieldSchemaComputed = false;
+	protected Schema.FieldSchema mFieldSchema = null;
 
     /**
      * @param plan
@@ -55,5 +60,54 @@
     public boolean supportsMultipleOutputs() {
         return false;
     }
+
+    @Override
+    public Schema getSchema() throws FrontendException{
+        return mSchema;
+    }
+
+	public abstract Schema.FieldSchema getFieldSchema() throws FrontendException;
+
+    /**
+     * Set the output schema for this operator. If a schema already exists, an
+     * attempt will be made to reconcile it with this new schema.
+     * 
+     * @param schema
+     *            Schema to set.
+     * @throws ParseException
+     *             if there is already a schema and the existing schema cannot
+     *             be reconciled with this new schema.
+     */
+    public final void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
+        // In general, operators don't generate their schema until they're
+        // asked, so ask them to do it.
+		log.debug("Inside setFieldSchema");
+        try {
+            getFieldSchema();
+        } catch (FrontendException fee) {
+            // It's fine, it just means we don't have a schema yet.
+        }
+		log.debug("After getFieldSchema()");
+        if (null == mFieldSchema) {
+            log.debug("Operator schema is null; Setting it to new schema");
+            mFieldSchema = fs;
+        } else {
+            log.debug("Reconciling schema");
+			log.debug("mFieldSchema: " + mFieldSchema + " fs: " + fs);
+            //log.debug("mSchema: " + mSchema + " schema: " + schema);
+			try {
+				if(null != mFieldSchema.schema) {
+            		mFieldSchema.schema.reconcile(fs.schema);
+				} else {
+					mFieldSchema.schema = fs.schema;
+				}
+				mFieldSchema.type = fs.type;
+				mFieldSchema.alias = fs.alias;
+			} catch (ParseException pe) {
+				throw new FrontendException(pe.getMessage());
+			}
+        }
+    }
+
 }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAdd.java Tue May 13 15:45:16 2008
@@ -47,11 +47,16 @@
 
     @Override
     public Schema getSchema() {
+        return mSchema;
+    }
+
+    @Override
+    public Schema.FieldSchema getFieldSchema() {
         // TODO When tuple addition is implemented, getSchema should
         // compute the schema, store the computed schema and return
         // the computed schema
 
-        return mSchema;
+        return mFieldSchema;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOAnd.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOBinCond.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOBinCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOBinCond.java Tue May 13 15:45:16 2008
@@ -84,22 +84,27 @@
         v.visit(this);
     }
 
+	@Override
+	public Schema getSchema() throws FrontendException {
+		return mSchema;
+	}
+
     @Override
-    public Schema getSchema() throws FrontendException {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
 		//TODO
 		//We need a check of LHS and RHS schemas
-        if (!mIsSchemaComputed && (null == mSchema)) {
+        if (!mIsFieldSchemaComputed && (null == mFieldSchema)) {
             try {
-                mSchema = mLhsOp.getSchema();
+                mFieldSchema = mLhsOp.getFieldSchema();
                 //mSchema = mLhsPlan.getRoots().get(0).getSchema();
-                mIsSchemaComputed = true;
-            } catch (FrontendException ioe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw ioe;
+                mIsFieldSchemaComputed = true;
+            } catch (FrontendException fee) {
+                mFieldSchema = null;
+                mIsFieldSchemaComputed = false;
+                throw fee;
             }
         }
-        return mSchema;
+        return mFieldSchema;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java Tue May 13 15:45:16 2008
@@ -62,6 +62,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public String name() {
         return "Cast " + mKey.scope + "-" + mKey.id;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue May 13 15:45:16 2008
@@ -136,18 +136,24 @@
                 log.debug("GBY Input: " + op.getClass().getName());
 				for(LogicalPlan plan: mGroupByPlans.get(op)) {
 	                int position = 0;
-	                for(LogicalOperator eOp: plan.getRoots()) {
-	                    Schema eOpSchema = eOp.getSchema();
-	                    log.debug("Computing the lookup tables");
-	                    if (null != eOpSchema) {
-	                        for (String alias : eOpSchema.getAliases()) {
-	                            log.debug("Adding alias to GBY: " + alias);
-	                            groupByAliases.add(alias);
-	                            lookup.put(alias, false);
-	                            aliasExop.put(alias, (ExpressionOperator)eOp);                            
-	                            positionAlias.put(position, alias);
-	                        }
-	                    }
+	                for(LogicalOperator eOp: plan.getLeaves()) {
+						log.debug("Leaf: " + eOp);
+						Schema.FieldSchema fs = ((ExpressionOperator)eOp).getFieldSchema();
+						if(null != fs) {
+		                    Schema eOpSchema = fs.schema;
+		                    log.debug("Computing the lookup tables");
+		                    if (null != fs) {
+								String alias = fs.alias;
+		                        //for (String alias : eOpSchema.getAliases()) {
+								if(null != alias) {
+		                            log.debug("Adding alias to GBY: " + alias);
+		                            groupByAliases.add(alias);
+		                            lookup.put(alias, false);
+		                            aliasExop.put(alias, (ExpressionOperator)eOp);                            
+		                            positionAlias.put(position, alias);
+		                        }
+		                    }
+						}
 	                }
 	                ++position;
 				}
@@ -170,16 +176,30 @@
                                 ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0];
                                 if(null != eOp) {
                                     if(!lookup.get(alias)) {
-                                        groupByFss.add(new Schema.FieldSchema(alias, eOp.getSchema()));
-                                        lookup.put(alias, true);
+										Schema.FieldSchema fs = eOp.getFieldSchema();
+										if(null != fs) {
+											log.debug("Added fs with alias " + alias + " and fs.schema " + fs.schema);
+                                        	groupByFss.add(new Schema.FieldSchema(alias, fs.schema));
+                                        	lookup.put(alias, true);
+										} else {
+											log.debug("Added fs with alias " + alias + " and schema null");
+											groupByFss.add(new Schema.FieldSchema(alias, null));
+										}
                                     } else {
                                         if(j < aliases.length) {
                                             continue;
                                         } else {
                                             //we have seen this alias before
                                             //just add the schema of the expression operator with the null alias
-                                            groupByFss.add(new Schema.FieldSchema(null, eOp.getSchema()));
-                                            break;
+											Schema.FieldSchema fs = eOp.getFieldSchema();
+											if(null != fs) {
+												log.debug("Added fs with alias null and schema " + fs.schema);
+                                            	groupByFss.add(new Schema.FieldSchema(null, fs.schema));
+                                            } else {
+												log.debug("Added fs with alias null and schema null");
+												groupByFss.add(new Schema.FieldSchema(null, null));
+											}
+											break;
                                         }
                                     }
                                 } else {
@@ -200,6 +220,7 @@
                     //We have positions $1, $2, etc.
                     //The schema for these columns is the schema of the expression operatore
                     //and so the alias is null
+					log.debug("Added fs with alias null and type bytearray");
                     groupByFss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));                    
                 }
             }            
@@ -213,18 +234,21 @@
 				byte groupByType = groupByFss.get(0).type;
 				Schema groupSchema = groupByFss.get(0).schema;
 				log.debug("Type == " + DataType.findTypeName(groupByType));
+				/*
 				if(DataType.TUPLE == groupByType || DataType.BAG == groupByType) { 
 					if(null != groupSchema) {
 						Schema innerSchema = groupSchema.getFields().get(0).schema;
 						fss.add(new Schema.FieldSchema("group", innerSchema, groupByType));
+						log.debug("Printing the aliases of the single group by column");
+						groupSchema.printAliases();
 					} else {
 						fss.add(new Schema.FieldSchema("group", groupSchema, groupByType));
 					}
-					log.debug("Printing the aliases of the single group by column");
-					groupSchema.printAliases();
 				} else {
 					fss.add(new Schema.FieldSchema("group", groupByType));
 				}
+				*/
+				fss.add(new Schema.FieldSchema("group", groupSchema, groupByType));
 			} else {
             	fss.add(new Schema.FieldSchema("group", groupBySchema));
 			}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOConst.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOConst.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOConst.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOConst.java Tue May 13 15:45:16 2008
@@ -58,6 +58,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public String name() {
         return "Const " + mKey.scope + "-" + mKey.id;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java Tue May 13 15:45:16 2008
@@ -20,6 +20,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.io.IOException;
 
 import org.apache.pig.data.DataType;
@@ -59,26 +64,101 @@
     @Override
     public Schema getSchema() throws FrontendException {
         if (!mIsSchemaComputed && (null == mSchema)) {
-            // Get the schema of the parents
-            Collection<LogicalOperator> s = mPlan.getPredecessors(this);
-            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(s
-                    .size());
-            for (LogicalOperator op : s) {
+            Collection<LogicalOperator> pred = mPlan.getPredecessors(this);
+            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+			Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
+			Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
+			Map<String, Integer> aliases = new HashMap<String, Integer>();
+
+            for (LogicalOperator op : pred) {
                 String opAlias = op.getAlias();
-                if (op.getType() == DataType.TUPLE) {
-                    try {
-                        fss.add(new Schema.FieldSchema(opAlias, op
-                                        .getSchema()));
-                    } catch (FrontendException ioe) {
-                        mSchema = null;
-                        mIsSchemaComputed = false;
-                        throw ioe;
-                    }
-                } else {
-                    fss.add(new Schema.FieldSchema(opAlias, op.getType()));
-                }
+				Schema s = op.getSchema();
+
+				//need to extract the children and create the aliases
+				//assumption here is that flatten is only for one column
+				//i.e., flatten(A), flatten(A.x) and NOT
+				//flatten(B.(x,y,z))
+				if(null != s) {
+					for(Schema.FieldSchema fs: s.getFields()) {
+						log.debug("fs: " + fs);
+						log.debug("fs.alias: " + fs.alias);
+						if(null != fs.alias) {
+							String disambiguatorAlias = opAlias + "::" + fs.alias;
+							Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+							fss.add(newFs);
+							Integer count;
+							count = aliases.get(fs.alias);
+							if(null == count) {
+								aliases.put(fs.alias, 0);
+							} else {
+								aliases.put(fs.alias, ++count);
+							}
+							count = aliases.get(disambiguatorAlias);
+							if(null == count) {
+								aliases.put(disambiguatorAlias, 0);
+							} else {
+								aliases.put(disambiguatorAlias, ++count);
+							}
+							flattenAlias.put(newFs, fs.alias);
+							inverseFlattenAlias.put(fs.alias, true);
+							//it's fine if there are duplicates
+							//we just need to record if its due to
+							//flattening
+						} else {
+							Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+							fss.add(newFs);
+						}
+					}
+				} else {
+					Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+					fss.add(newFs);
+				}
             }
+
+			//check for duplicate column names and throw an error if there are duplicates
+			//ensure that flatten gets rid of duplicate column names when the checks are
+			//being done
+			log.debug(" flattenAlias: " + flattenAlias);
+			log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
+			log.debug(" aliases: " + aliases);
+			log.debug(" fss.size: " + fss.size());
+			boolean duplicates = false;
+			Set<String> duplicateAliases = new HashSet<String>();
+			for(String alias: aliases.keySet()) {
+				Integer count = aliases.get(alias);
+				if(count > 0) {
+					Boolean inFlatten = false;
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					inFlatten = inverseFlattenAlias.get(alias);
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					if((null != inFlatten) && (!inFlatten)) {
+						duplicates = true;
+						duplicateAliases.add(alias);
+					}
+				}
+			}
+			if(duplicates) {
+				String errMessage = "Found duplicates in schema ";
+				if(duplicateAliases.size() > 0) {
+					Iterator<String> iter = duplicateAliases.iterator();
+					errMessage += ": " + iter.next();
+					while(iter.hasNext()) {
+						errMessage += ", " + iter.next();
+					}
+				}
+				throw new FrontendException(errMessage);
+			}
             mSchema = new Schema(fss);
+			//add the aliases that are unique after flattening
+			for(Schema.FieldSchema fs: mSchema.getFields()) {
+				String alias = flattenAlias.get(fs);
+				Integer count = aliases.get(alias);
+				if (null == count) count = 0;
+				log.debug("alias: " + alias);
+				if((null != alias) && (count == 0)) {
+					mSchema.addAlias(alias, fs);
+				}
+			}
             mIsSchemaComputed = true;
         }
         return mSchema;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java Tue May 13 15:45:16 2008
@@ -61,7 +61,7 @@
             try {
                 LogicalOperator op = s.iterator().next();
                 if (null == op) {
-                    log.info("getSchema: Operator not in plan");
+                    log.debug("getSchema: Operator not in plan");
                     throw new FrontendException("Could not find operator in plan");
                 }
                 mSchema = s.iterator().next().getSchema();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODivide.java Tue May 13 15:45:16 2008
@@ -47,14 +47,18 @@
 
     @Override
     public Schema getSchema() {
-        // TODO When tuple division is implemented, getSchema should
-        // compute the schema, store the computed schema and return
-        // the computed schema
-
         return mSchema;
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        // TODO When tuple division by a scalar is implemented, getFieldSchema should
+        // compute the schema, store and return the computed schema 
+
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEqual.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Tue May 13 15:45:16 2008
@@ -21,9 +21,9 @@
 import java.util.List;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.HashMap;
-
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -111,83 +111,138 @@
                     mGeneratePlans.size());
 
             for (LogicalPlan plan : mGeneratePlans) {
-				log.debug("Number of roots in " + plan + " = " + plan.getRoots().size());
-				for(int i = 0; i < plan.getRoots().size(); ++i) {
-					log.debug("Root" + i + "= " + plan.getRoots().get(i));
+				log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size());
+				for(int i = 0; i < plan.getLeaves().size(); ++i) {
+					log.debug("Leaf" + i + "= " + plan.getLeaves().get(i));
 				}
-				LogicalOperator op = plan.getRoots().get(0);
+				//LogicalOperator op = plan.getRoots().get(0);
+				LogicalOperator op = plan.getLeaves().get(0);
                 log.debug("op: " + op.getClass().getName() + " " + op);
 			}
-			log.debug("Printed the roots of the generate plans");
+			log.debug("Printed the leaves of the generate plans");
 
 			Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
+			Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
+			Map<String, Integer> aliases = new HashMap<String, Integer>();
 
             for (int planCtr = 0; planCtr < mGeneratePlans.size(); ++planCtr) {
 				LogicalPlan plan = mGeneratePlans.get(planCtr);
-				LogicalOperator op = plan.getRoots().get(0);
+				LogicalOperator op = plan.getLeaves().get(0);
                 log.debug("op: " + op.getClass().getName() + " " + op);
-                Set<String> aliases;
-                Iterator<String> iter = null;
-                String opSchemaAlias = null;
-                Schema s = null;
+                //Set<String> aliases;
+                //Iterator<String> iter = null;
+                //String opSchemaAlias = null;
+                //Schema s = null;
+				Schema.FieldSchema planFs;
 
                 try {
-                    s = op.getSchema();
-                    if (null != s) {
-                        log.debug("Printing aliases in LOGenerate");
-                        s.printAliases();
-                        aliases = op.getSchema().getAliases();
-                        iter = aliases.iterator();
-                    }
-                } catch (FrontendException ioe) {
-                    mSchema = null;
-                    mIsSchemaComputed = false;
-                    throw ioe;
-                }
-
-                // ASSUMPTION
-                // Here I am assuming that the LOProject does not have multiple
-                // columns in the project, i.e., generate A.($1,$2,$3) as
-                // (name,age,gpa)
-
-                if ((null != iter) && (iter.hasNext())) {
-                    opSchemaAlias = iter.next();
-                    log.debug("iter.next: " + opSchemaAlias);
-                }
-                
-
-                log.debug("Type: " + DataType.findTypeName(op.getType())
-                        + " Alias: " + opSchemaAlias);
-
-                if (mFlatten.get(planCtr)) {
-                    log.debug("Flatten");
-                    for(Schema.FieldSchema fs: s.getFields()) {
-						Schema internalSchema = fs.schema;
-						Schema.FieldSchema newFs;
-						log.debug("Flatten level 1 internalSchema: " + internalSchema);
-						if(null != internalSchema) {
-							for(Schema.FieldSchema internalfs: internalSchema.getFields()) {
-                        		newFs = new Schema.FieldSchema(internalfs.alias, internalfs.schema);
+	                planFs = ((ExpressionOperator)op).getFieldSchema();
+					if(null != planFs) {
+						log.debug("planFs alias: " + planFs.alias);
+						if(mFlatten.get(planCtr)) {
+							//need to extract the children and create the aliases
+							//assumption here is that flatten is only for one column
+							//i.e., flatten(A), flatten(A.x) and NOT
+							//flatten(B.(x,y,z))
+							Schema s = planFs.schema;
+							if(null != s) {
+								for(Schema.FieldSchema fs: s.getFields()) {
+									log.debug("fs: " + fs);
+									log.debug("fs.alias: " + fs.alias);
+									if(null != fs.alias) {
+										String disambiguatorAlias = planFs.alias + "::" + fs.alias;
+										Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+										fss.add(newFs);
+										Integer count;
+										count = aliases.get(fs.alias);
+										if(null == count) {
+											aliases.put(fs.alias, 0);
+										} else {
+											aliases.put(fs.alias, ++count);
+										}
+										count = aliases.get(disambiguatorAlias);
+										if(null == count) {
+											aliases.put(disambiguatorAlias, 0);
+										} else {
+											aliases.put(disambiguatorAlias, ++count);
+										}
+										flattenAlias.put(newFs, fs.alias);
+										inverseFlattenAlias.put(fs.alias, true);
+										//it's fine if there are duplicates
+										//we just need to record if its due to
+										//flattening
+									} else {
+										Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+										fss.add(newFs);
+									}
+								}
+							} else {
+								Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
 								fss.add(newFs);
-								log.debug("Flatten alias: " + opSchemaAlias+"::"+internalfs.alias);
-								flattenAlias.put(newFs, opSchemaAlias+"::"+internalfs.alias);
+							}
+						} else {
+							//just populate the schema with the field schema of the expression operator
+	                   		fss.add(planFs);
+							if(null != planFs.alias) {
+								Integer count = aliases.get(planFs.alias);
+								if(null == count) {
+									aliases.put(planFs.alias, 0);
+								} else {
+									aliases.put(planFs.alias, ++count);
+								}
 							}
 						}
-						else {
-                        	newFs= new Schema.FieldSchema(fs.alias, fs.schema);
-                        	fss.add(newFs);
-							log.debug("Flatten alias: " + opSchemaAlias+"::"+fs.alias);
-							flattenAlias.put(newFs, opSchemaAlias+"::"+fs.alias);
-						}
-                    }
-                } else {
-                    fss.add(new Schema.FieldSchema(opSchemaAlias, s));
+					} else {
+						//did not get a valid list of field schemas
+						fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+					}
+                } catch (FrontendException fee) {
+                    mSchema = null;
+                    mIsSchemaComputed = false;
+                    throw fee;
                 }
             }
+			//check for duplicate column names and throw an error if there are duplicates
+			//ensure that flatten gets rid of duplicate column names when the checks are
+			//being done
+			log.debug(" flattenAlias: " + flattenAlias);
+			log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
+			log.debug(" aliases: " + aliases);
+			log.debug(" fss.size: " + fss.size());
+			boolean duplicates = false;
+			Set<String> duplicateAliases = new HashSet<String>();
+			for(String alias: aliases.keySet()) {
+				Integer count = aliases.get(alias);
+				if(count > 0) {
+					Boolean inFlatten = false;
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					inFlatten = inverseFlattenAlias.get(alias);
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					if((null != inFlatten) && (!inFlatten)) {
+						duplicates = true;
+						duplicateAliases.add(alias);
+					}
+				}
+			}
+			if(duplicates) {
+				String errMessage = "Found duplicates in schema ";
+				if(duplicateAliases.size() > 0) {
+					Iterator<String> iter = duplicateAliases.iterator();
+					errMessage += ": " + iter.next();
+					while(iter.hasNext()) {
+						errMessage += ", " + iter.next();
+					}
+				}
+				throw new FrontendException(errMessage);
+			}
             mSchema = new Schema(fss);
+			//add the aliases that are unique after flattening
 			for(Schema.FieldSchema fs: mSchema.getFields()) {
 				String alias = flattenAlias.get(fs);
-				if(null != alias) {
+				Integer count = aliases.get(alias);
+				if (null == count) count = 0;
+				log.debug("alias: " + alias);
+				if((null != alias) && (count == 0)) {
 					mSchema.addAlias(alias, fs);
 				}
 			}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThan.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGreaterThanEqual.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThan.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLesserThanEqual.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMapLookup.java Tue May 13 15:45:16 2008
@@ -95,19 +95,23 @@
 
     @Override
     public Schema getSchema() {
-        if (!mIsSchemaComputed && (null == mSchema)) {
+        return mSchema;
+    }
+
+    @Override
+    public Schema.FieldSchema getFieldSchema() {
+        if (!mIsFieldSchemaComputed && (null == mFieldSchema)) {
             Schema.FieldSchema fss;
-            if (DataType.findType(mValueType) == DataType.TUPLE) {
+            if (DataType.isSchemaType(mValueType)) {
                 fss = new Schema.FieldSchema(null, mValueSchema);
             } else {
                 fss = new Schema.FieldSchema(null, DataType
                         .findType(mValueType));
             }
 
-            mSchema = new Schema(fss);
-            mIsSchemaComputed = true;
+            mIsFieldSchemaComputed = true;
         }
-        return mSchema;
+        return mFieldSchema;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMod.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOMultiply.java Tue May 13 15:45:16 2008
@@ -47,14 +47,18 @@
 
     @Override
     public Schema getSchema() {
-        // TODO When tuple multiplication is implemented, getSchema should
-        // compute the schema, store the computed schema and return
-        // the computed schema
-
         return mSchema;
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        // TODO When tuple multiplication is implemented, getFieldSchema should
+        // compute the schema, store and return the computed schema
+
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONegative.java Tue May 13 15:45:16 2008
@@ -46,6 +46,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONot.java Tue May 13 15:45:16 2008
@@ -49,6 +49,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LONotEqual.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOOr.java Tue May 13 15:45:16 2008
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Tue May 13 15:45:16 2008
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.commons.logging.Log;
@@ -150,8 +151,8 @@
     }
 
     @Override
-    public Schema getSchema() throws FrontendException {
-        log.debug("Inside getSchema");
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
+        log.debug("Inside getFieldSchemas");
 		log.debug("Number of columns: " + mProjection.size());
         for (int i : mProjection) {
             log.debug("Column: " + i);
@@ -160,7 +161,7 @@
 		log.debug("expressionOperator = " + expressionOperator);
 		log.debug("mIsStar: " + mIsStar);
 
-        if (!mIsSchemaComputed && (null == mSchema)) {
+        if (!mIsFieldSchemaComputed && (null == mFieldSchema)) {
 
             if (mIsStar) {
                 log.debug("mIsStar is true");
@@ -168,142 +169,111 @@
                     if (null != expressionOperator) {
                         log.debug("expressionOperator is not null "
                                 + expressionOperator.getClass().getName() + " " + expressionOperator);
-                        mSchema = expressionOperator.getSchema();
-                    }
-                    mIsSchemaComputed = true;
-                } catch (FrontendException ioe) {
-                    mSchema = null;
-                    mIsSchemaComputed = false;
-                    throw ioe;
+						if(!mSentinel) {
+							//we have an expression operator and hence a list of field shcemas
+							mFieldSchema = ((ExpressionOperator)expressionOperator).getFieldSchema();
+						} else {
+							//we have a relational operator as input and hence a schema
+							log.debug("expression operator alias: " + expressionOperator.getAlias());
+							log.debug("expression operator schema: " + expressionOperator.getSchema());
+							log.debug("expression operator type: " + expressionOperator.getType());
+							//TODO
+							//the type of the operator will be unkown. when type checking is in place
+							//add the type of the operator as a parameter to the fieldschema creation
+							//mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), expressionOperator.getType());
+							mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
+                    	}
+					} else {
+						log.warn("The input for a projection operator cannot be null");
+					}
+                    mIsFieldSchemaComputed = true;
+                } catch (FrontendException fee) {
+                    mFieldSchema = null;
+                    mIsFieldSchemaComputed = false;
+                    throw fee;
                 }
                 log.debug("mIsStar is true, returning schema of expressionOperator");
                 log.debug("Exiting getSchema()");
-                return mSchema;
+                return mFieldSchema;
             } else {
-                List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
-                        mProjection.size());
-
-                if (mProjection.size() == 1) {
-                    log.debug("Only one element");
-                    try {
-                        Schema s = expressionOperator.getSchema();
-                        if (null != s) {
-							log.debug("Getting the fieldschema for column: " + mProjection.get(0));
-                            Schema.FieldSchema fs = null; 
+				//its n list of columns to project including a single column
+                List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(mProjection.size());
+				try {
+	                if (null != expressionOperator) {
+	                    log.debug("expressionOperator is not null");
+						if(mProjection.size() == 1) {
+							//if there is only one element then extract and return the field schema
+							log.debug("Only one element");
 							if(!mSentinel) {
-								log.debug("We are in the outer part of the projection");
-								fs = s.getField(0);
-								log.debug("fs in outer part: " + fs);
-								s = fs.schema;
-								log.debug("s: " + s);
+								log.debug("Input is an expression operator");
+								Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+								if(null != expOpFs) {
+									Schema s = expOpFs.schema;
+									if(null != s) {
+										mFieldSchema = s.getField(mProjection.get(0));
+									} else {
+										mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+									}
+								} else {
+									mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+								}
+							} else {
+								log.debug("Input is a logical operator");
+	                       		Schema s = expressionOperator.getSchema();
+	                    		log.debug("s: " + s);
 								if(null != s) {
-									fs = s.getField(mProjection.get(0));
-									log.debug("fs in outer part after unwrapping: " + fs);
+	                            	mFieldSchema = s.getField(mProjection.get(0));
+									log.debug("mFieldSchema alias: " + mFieldSchema.alias);
+									log.debug("mFieldSchema schema: " + mFieldSchema.schema);
+								} else {
+									mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+								}
+							}
+							mIsFieldSchemaComputed = true;
+							return mFieldSchema;
+						}
+						
+	                    for (int colNum : mProjection) {
+	                        log.debug("Col: " + colNum);
+							if(!mSentinel) {
+								Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
+								if(null != expOpFs) {
+									Schema s = expOpFs.schema;
+									if(null != s) {
+										fss.add(s.getField(colNum));
+									} else {
+										fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+									}
 								} else {
-									fs = null;
-									log.debug("fs in outer part after unwrapping: " + fs);
+									fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
 								}
 							} else {
-								log.debug("We are in the inner most part of the projection");
-								fs = s.getField(mProjection.get(0));
+								Schema s = expressionOperator.getSchema();
+								if(null != s) {
+									fss.add(s.getField(colNum));
+								} else {
+									fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+								}
 							}
-                            if (null != fs) {
-                                log.debug("fs.type: "
-                                        + DataType.findTypeName(fs.type)
-                                        + " fs.schema: " + fs.schema);
-                                if (fs.type == DataType.BAG
-                                        || fs.type == DataType.TUPLE) {
-                                    if (null != fs.schema) {
-                                        log.debug("fs.schema aliases");
-                                        fs.schema.printAliases();
-                                    }
-                                    
-                                    //mSchema = fs.schema;
-                                    mSchema = new Schema(new Schema.FieldSchema(fs.alias, fs.schema));
-                                    //mSchema = new Schema(fs);
-                                } else {
-                                    mSchema = new Schema(fs);
-                                }
-                            } else {
-                            	log.debug("expressionOperator.schema is null");
-                            	mSchema = new Schema(new Schema.FieldSchema(null,
-                                    DataType.BYTEARRAY));
-                            }
-                        } else {
-                            log.debug("expressionOperator.schema is null");
-                            mSchema = new Schema(new Schema.FieldSchema(null,
-                                    DataType.BYTEARRAY));
-                        }
-                    } catch (Exception e) {
-                        mSchema = null;
-                        mIsSchemaComputed = false;
-                        throw new FrontendException(e.getMessage());
-                    }
-                    mIsSchemaComputed = true;
-                    log.debug("Exiting getSchema");
-                    return mSchema;
-                }
-
-                if (null != expressionOperator) {
-                    log.debug("expressionOperator is not null");
-
-                    Schema s = expressionOperator.getSchema();
-                    log.debug("s: " + s);
-                    for (int colNum : mProjection) {
-                        log.debug("Col: " + colNum);
-                        if (null != s) {
-                            try {
-                                Schema.FieldSchema tempFs = s.getField(0);
-                                if (null != tempFs) {
-                                    if(null != tempFs.schema) {
-                                        Schema.FieldSchema fs = tempFs.schema.getField(colNum);
-                                        log.debug("fs.type: "
-                                                + DataType.findTypeName(fs.type)
-                                                + " fs.schema: " + fs.schema);
-                                        if(fs.type == DataType.BAG || fs.type == DataType.TUPLE) {
-                                            if (null != fs.schema) {
-                                                log.debug("fs.schema aliases");
-                                                fs.schema.printAliases();
-                                            }
-                                                fss.add(new Schema.FieldSchema(fs.alias, fs.schema));
-                                        } else {
-                                            fss.add(fs);
-                                        }
-                                    } else {
-                                        log.debug("expressionOperator.fs.schema is null");
-                                        fss.add(new Schema.FieldSchema(null,
-                                                DataType.BYTEARRAY));
-                                    }
-                                } else {
-									log.debug("Could not find column " + colNum+ " in schema");
-                                    throw new FrontendException(
-                                            "Could not find column " + colNum
-                                                    + " in schema");
-                                }
-                            } catch (Exception e) {
-								log.debug("Caught exception: " + e.getMessage());
-                                mSchema = null;
-                                mIsSchemaComputed = false;
-                                throw new FrontendException(e.getMessage());
-                            }
-                        } else {
-                            log.debug("expressionOperator.schema is null");
-                            fss.add(new Schema.FieldSchema(null,
-                                    DataType.BYTEARRAY));
-                        }
-                    }
-                } else {
-                    log.debug("expressionOperator is null");
-                    fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
-                }
-                mSchema = new Schema(fss);
-                mIsSchemaComputed = true;
-                log.debug("mIsStar is false, returning computed schema of expressionOperator");
+						}
+	
+	                } else {
+						log.warn("The input for a projection operator cannot be null");
+	                    //fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+	                }
+				} catch(ParseException pe) {
+					mFieldSchema = null;
+					mIsFieldSchemaComputed = false;
+					throw new FrontendException(pe.getMessage());
+				}
+				mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
+				mIsFieldSchemaComputed = true;
+                log.debug("mIsStar is false, returning computed field schema of expressionOperator");
             }
         }
 
-        log.debug("Exiting getSchema");
-        return mSchema;
+        log.debug("Exiting getFieldSchema");
+        return mFieldSchema;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORegexp.java Tue May 13 15:45:16 2008
@@ -66,7 +66,7 @@
     
     @Override
     public String name() {
-        return "Project " + mKey.scope + "-" + mKey.id;
+        return "Regexp " + mKey.scope + "-" + mKey.id;
     }
 
     @Override
@@ -80,6 +80,11 @@
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSubtract.java Tue May 13 15:45:16 2008
@@ -47,11 +47,16 @@
 
     @Override
     public Schema getSchema() {
+        return mSchema;
+    }
+
+    @Override
+    public Schema.FieldSchema getFieldSchema() {
         // TODO When tuple subtraction is implemented, getSchema should
         // compute the schema, store the computed schema and return
         // the computed schema
 
-        return mSchema;
+        return mFieldSchema;
     }
 
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Tue May 13 15:45:16 2008
@@ -19,6 +19,8 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Collection;
+import java.util.Iterator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
@@ -54,13 +56,34 @@
     }
 
     @Override
-    public Schema getSchema() {
-        if (null == mSchema) {
-            // TODO FIX
-            // The schema merge operation needs to be implemented in
-            // order to compute the schema of the union
-            //If schemas are the same then return one of the schemas else 
-            //return null
+    public Schema getSchema() throws FrontendException {
+        if (!mIsSchemaComputed && (null == mSchema)) {
+            Collection<LogicalOperator> s = mPlan.getPredecessors(this);
+			log.debug("Number of predecessors in the graph: " + s.size());
+            try {
+				Iterator<LogicalOperator> iter = s.iterator();
+                LogicalOperator op = iter.next();
+                if (null == op) {
+                    log.debug("getSchema: Operator not in plan");
+                    throw new FrontendException("Could not find operator in plan");
+                }
+                mSchema = op.getSchema();
+				log.debug("Printing aliases");
+				mSchema.printAliases();
+				while(iter.hasNext()) {
+                	op = iter.next();
+					if(null == mSchema) {
+						log.debug("Schema is null, cannot perform schema merge");
+						throw new FrontendException("Schema is null, cannot perform schema merge");
+					}
+					mSchema = mSchema.merge(op.getSchema(), false);
+				}
+				mIsSchemaComputed = true;
+            } catch (FrontendException fe) {
+                mSchema = null;
+                mIsSchemaComputed = false;
+                throw fe;
+            }
         }
         return mSchema;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Tue May 13 15:45:16 2008
@@ -69,13 +69,19 @@
 
     @Override
     public Schema getSchema() {
-        if (mSchema == null) {
-            mSchema = new Schema(new Schema.FieldSchema(null, mType));
-        }
         return mSchema;
     }
 
     @Override
+    public Schema.FieldSchema getFieldSchema() {
+        if (!mIsFieldSchemaComputed && (mFieldSchema == null)) {
+            mFieldSchema = new Schema.FieldSchema(null, mType);
+			mIsFieldSchemaComputed = true;
+        }
+        return mFieldSchema;
+    }
+
+    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue May 13 15:45:16 2008
@@ -319,7 +319,8 @@
 	}
 
 	 //END
-
+	
+	private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
 }
 
 	
@@ -371,6 +372,15 @@
 TOKEN : { <EVAL : "eval"> }
 TOKEN : { <ASC : "asc"> }
 TOKEN : { <DESC : "desc"> }
+TOKEN : { <INT : "integer"> }
+TOKEN : { <LONG : "long"> }
+TOKEN : { <FLOAT : "float"> }
+TOKEN : { <DOUBLE : "double"> }
+TOKEN : { <CHARARRAY : "chararray"> }
+TOKEN : { <BYTEARRAY : "bytearray"> }
+TOKEN : { <BAG : "bag"> }
+TOKEN : { <TUPLE : "tuple"> }
+TOKEN : { <MAP : "map"> }
 
 TOKEN:
 {
@@ -383,8 +393,8 @@
 // Define Numeric Constants
 TOKEN :
 {
-	< NUMBER: <INTEGER> | <FLOAT> | <FLOAT> ( ["e","E"] ([ "-","+"])? <FLOAT> )?>
-| 	< #FLOAT: <INTEGER> ( "." <INTEGER> )? | "." <INTEGER> >
+	< NUMBER: <INTEGER> | <FLOATINGPOINT> | <FLOATINGPOINT> ( ["e","E"] ([ "-","+"])? <FLOATINGPOINT> )?>
+| 	< #FLOATINGPOINT: <INTEGER> ( "." <INTEGER> )? | "." <INTEGER> >
 | 	< INTEGER: ( <DIGIT> )+ >
 }
 
@@ -1355,7 +1365,7 @@
 	ExpressionOperator item; 
 	Schema schema = null; 
 	Token t; 
-	Schema.FieldSchema fs; 
+	Schema.FieldSchema fs = null; 
 	boolean flatten = false;
 	log.trace("Entering FlattenedGenerateItem");
 }
@@ -1379,16 +1389,12 @@
 		}
 	)
 	)
-	[ <AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" | fs = AtomSchema() {schema = new Schema(fs);})]
+	[ <AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {if(schema.size() > 1) {throw new ParseException("Schema mismatch");} fs = schema.getFields().get(0);} | fs = AtomSchema() )]
 	)
 	{
 		log.debug("item: " + item.getClass().getName());
-		if(null != schema) {
-			item.setSchema(schema); 
-			log.debug("Printing Schema Aliases from parser"); 
-			schema.printAliases();
-			log.debug("Printing Schema Aliases from item"); 
-			item.getSchema().printAliases();			
+		if(null != fs) {
+			item.setFieldSchema(fs); 
 		}
 		flattenList.add(flatten);
 		log.trace("Exiting FlattenedGenerateItem");
@@ -1535,22 +1541,26 @@
 	)
 	(
 		{ 
-			subSchema = item.getSchema(); 
+			Schema.FieldSchema fs = item.getFieldSchema(); 
+			subSchema = fs.schema; 
 			//TODO
 			//HACK for the schema problems with LOProject
 			//Check the schema to see if the constituent
 			//field is a bag or a tuple/ If so, then get
 			//that schema and send it out instead of the
 			//actual schema
-			log.debug("Printing subSchema Aliases");
-			subSchema.printAliases();
-			
+			log.debug("subSchema: " + subSchema);
+			if(null != subSchema) {
+				log.debug("Printing subSchema Aliases");
+				subSchema.printAliases();
+			}
+			/*	
 			log.debug("Printing the field schemas of subSchema");
 			for(Schema.FieldSchema fs: subSchema.getFields()) {
 				log.debug("fs: " + fs);
 				subSchema = fs.schema;
 			}
-			
+			*/
 		}
 		( 
 			"." projection = BracketedSimpleProj(subSchema,lp,item) 
@@ -1563,10 +1573,10 @@
 |		( "#" t = <QUOTEDSTRING> { 
 			assertAtomic(item, false);
 			ExpressionOperator mapLookup = new LOMapLookup(lp, new OperatorKey(scope, getNextId()), item, unquote(t.image), DataType.BYTEARRAY, null);
-			item = mapLookup;
 			lp.add(mapLookup);
 			log.debug("BaseEvalSpec: Added operator " + mapLookup.getClass().getName() + " " + mapLookup + " to logical plan " + lp);
 			lp.connect(item, mapLookup);
+			item = mapLookup;
 			log.debug("BaseEvalSpec: Connected operator " + item.getClass().getName() + " " + item+ " to " + mapLookup + " logical plan " + lp);
 		}
 		)
@@ -1624,7 +1634,7 @@
 			lp.connect(exprOp, userFunc);
 			log.debug("FuncEvalSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
 		}
-		log.trace("Exiting BinCond");
+		log.trace("Exiting FuncEvalSpec");
 		return userFunc;
 	}
 }
@@ -1665,16 +1675,61 @@
 		log.debug("EvalArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
 	}
 	)
-	{log.trace("Entering EvalArgsItem");return item;}
+	{log.trace("Exiting EvalArgsItem");return item;}
+}
+
+byte Type() : 
+{ 
+	log.trace("Entering Type");
+	byte type;
+}
+{
+ (type = BasicType() | type = CompositeType()) 
+ {
+	log.trace("Exiting Type");
+ 	return type;
+ }	
 }
 
+byte CompositeType() : 
+{ 
+	log.trace("Entering CompositeType");
+	Token t = null;
+	byte type;
+}
+{
+ (t = <MAP>| t = <BAG>| t = <TUPLE>) 
+ {
+	log.debug("t: " + t + " type: " + nameToTypeMap.get(t.image.toUpperCase()));
+ 	type = nameToTypeMap.get(t.image.toUpperCase());
+	log.trace("Exiting CompositeType");
+ 	return type;
+ }
+}
+
+byte BasicType() : 
+{ 
+	log.trace("Entering BasicType");
+	Token t = null;
+	byte type;
+}
+{
+ (t = <INT>| t = <LONG>| t = <FLOAT>| t = <DOUBLE>| t = <CHARARRAY>| t = <BYTEARRAY>) 
+ {
+	log.debug(" nameToTypeMap: " + nameToTypeMap);
+	log.debug("t: " + t + " type: " + nameToTypeMap.get(t.image.toUpperCase()));
+ 	type = nameToTypeMap.get(t.image.toUpperCase());
+	log.trace("Exiting BasicType");
+ 	return type;
+ }
+}
 
 Schema.FieldSchema FieldSchema() : 
 {
 	Token t1; 
 	Schema item = null; 
 	Schema.FieldSchema fs = null; 
-	log.trace("Entering Schema");
+	log.trace("Entering FieldSchema");
 }
 {
 	(
@@ -1683,21 +1738,28 @@
 |	LOOKAHEAD(AtomSchema()) fs = AtomSchema()
 	)
 	//{log.debug("Printing Aliases"); item.printAliases();log.trace("Exiting Schema");return item;}
-	{log.trace("Exiting Schema");return fs;}
+	{log.trace("Exiting FieldSchema");return fs;}
 }
 
 Schema.FieldSchema AtomSchema() : 
 {
-	Token t1;
+	Token t1 = null;
+	byte type = DataType.BYTEARRAY;
+	Schema.FieldSchema fs;
 	log.trace("Entering AtomSchema");
 }
 {
-	(  ( t1 = <IDENTIFIER> ) 
+	(  ( t1 = <IDENTIFIER> [":" type = BasicType() ] )
 		{ 
-			log.debug("AtomSchema: " + t1.image);
+			if(null != t1) {
+				log.debug("AtomSchema: " + t1.image);
+				fs = new Schema.FieldSchema(t1.image, type); 
+			} else {
+				fs = new Schema.FieldSchema(null, type); 
+			}
 			
 			log.trace("Exiting AtomSchema");
-			return new Schema.FieldSchema(t1.image, DataType.BYTEARRAY); 
+			return fs;
 		} 
 	)
 }
@@ -1710,7 +1772,7 @@
 	log.trace("Entering SchemaTuple");
 }
 { 
-	[( t1 = <IDENTIFIER> ) ":"] "(" list = TupleSchema() ")"	 
+	( t1 = <IDENTIFIER> )  [":" <TUPLE>] "(" list = TupleSchema() ")" 
 	{
 		if (null != t1) {
 			log.debug("TUPLE alias " + t1.image);
@@ -1731,7 +1793,7 @@
 	log.trace("Entering SchemaBag");
 }
 { 
-	[( t1 = <IDENTIFIER> ) ":"] "[" list = TupleSchema() "]"	 
+	( t1 = <IDENTIFIER> ) [":" <BAG>] "{" list = TupleSchema() "}" 
 	{
 		if (null != t1) {
 			log.debug("BAG alias " + t1.image);
@@ -1780,7 +1842,7 @@
 			throw new ParseException(e.getMessage());
 		}
 		
-		log.trace("Exiting EvalFunc");
+		log.trace("Exiting EvalFunction");
 		return funcName;
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue May 13 15:45:16 2008
@@ -109,12 +109,15 @@
 
         @Override
         public boolean equals(Object other) {
-            if (!(other instanceof FieldSchema)) return false;
-            FieldSchema fs = (FieldSchema)other;
-            // Fields can have different names and still be equal.  But
+            if (!(other instanceof FieldSchema))
+                return false;
+            FieldSchema fs = (FieldSchema) other;
+            // Fields can have different names and still be equal. But
             // types and schemas (if they're a tuple) must match.
-            if (type != fs.type) return false;
-            if (schema != fs.schema) return false;
+            if (type != fs.type)
+                return false;
+            if (schema != fs.schema)
+                return false;
 
             return true;
         }
@@ -164,7 +167,7 @@
     private Map<String, FieldSchema> mAliases;
     private MultiMap<FieldSchema, String> mFieldSchemas;
     private static Log log = LogFactory.getLog(Schema.class);
- 
+
     public Schema() {
         mFields = new ArrayList<FieldSchema>();
         mAliases = new HashMap<String, FieldSchema>();
@@ -172,7 +175,8 @@
     }
 
     /**
-     * @param fields List of field schemas that describes the fields.
+     * @param fields
+     *            List of field schemas that describes the fields.
      */
     public Schema(List<FieldSchema> fields) {
         mFields = fields;
@@ -190,7 +194,9 @@
 
     /**
      * Create a schema with only one field.
-     * @param fieldSchema field to put in this schema.
+     * 
+     * @param fieldSchema
+     *            field to put in this schema.
      */
     public Schema(FieldSchema fieldSchema) {
         mFields = new ArrayList<FieldSchema>(1);
@@ -207,7 +213,9 @@
 
     /**
      * Given an alias name, find the associated FieldSchema.
-     * @param alias Alias to look up.
+     * 
+     * @param alias
+     *            Alias to look up.
      * @return FieldSchema, or null if no such alias is in this tuple.
      */
     public FieldSchema getField(String alias) {
@@ -243,20 +251,26 @@
     }
 
     /**
-     * Reconcile this schema with another schema.  The schema being reconciled
-     * with should have the same number of columns.  The use case is where a
-     * schema already exists but may not have alias and or type information.  If
+     * Reconcile this schema with another schema. The schema being reconciled
+     * with should have the same number of columns. The use case is where a
+     * schema already exists but may not have alias and or type information. If
      * an alias exists in this schema and a new one is given, then the new one
-     * will be used.  Similarly with types, though this needs to be used
+     * will be used. Similarly with types, though this needs to be used
      * carefully, as types should not be lightly changed.
-     * @param other Schema to reconcile with.
-     * @throws ParseException if this cannot be reconciled.
+     * 
+     * @param other
+     *            Schema to reconcile with.
+     * @throws ParseException
+     *             if this cannot be reconciled.
      */
     public void reconcile(Schema other) throws ParseException {
         if (other.size() != size()) {
+            log.debug("Cannot reconcile schemas with different "
+                    + "sizes.  This schema has size " + size()
+                    + " other has size " + "of " + other.size());
             throw new ParseException("Cannot reconcile schemas with different "
-                + "sizes.  This schema has size " + size() + " other has size " 
-                + "of " + other.size());
+                    + "sizes.  This schema has size " + size()
+                    + " other has size " + "of " + other.size());
         }
 
         Iterator<FieldSchema> i = other.mFields.iterator();
@@ -295,22 +309,24 @@
                 ourFs.schema = otherFs.schema;
                 log.debug("Setting schema to: " + otherFs.schema);
             }
-
         }
     }
 
     @Override
     public boolean equals(Object other) {
-        if (!(other instanceof Schema)) return false;
+        if (!(other instanceof Schema))
+            return false;
 
-        Schema s = (Schema)other;
+        Schema s = (Schema) other;
 
-        if (s.size() != size()) return false;
+        if (s.size() != size())
+            return false;
 
         Iterator<FieldSchema> i = mFields.iterator();
         Iterator<FieldSchema> j = s.mFields.iterator();
         while (i.hasNext()) {
-            if (!(i.next().equals(j.next()))) return false;
+            if (!(i.next().equals(j.next())))
+                return false;
         }
         return true;
     }
@@ -475,7 +491,7 @@
                                             otherTakesAliasPrecedence) ;
             
             FieldSchema mergedFs = null ;
-            if (mergedType != DataType.TUPLE) {
+            if (!DataType.isSchemaType(mergedType)) {
                 // just normal merge              
                 mergedFs = new FieldSchema(mergedAlias, mergedType) ;
             }
@@ -562,9 +578,5 @@
         // else return just ERROR
         return DataType.ERROR ;
     }
-    
-    
-}
-
-
 
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=656039&r1=656038&r2=656039&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue May 13 15:45:16 2008
@@ -71,7 +71,6 @@
         buildPlan(query);
     }
 
-    
     /* TODO FIX
     @Test
     public void testQuery3() {
@@ -427,7 +426,7 @@
     
     @Test
     public void testQuery32() {
-        String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
+        String query = "foreach (load 'myfile' as (col1, col2 : tuple(sub1, sub2), col3 : tuple(bag1))) generate col1 ;";
         buildPlan(query);
     }
     
@@ -444,7 +443,7 @@
     @Test
     //TODO: Nested schemas don't work now. Probably a bug in the new parser.
     public void testQuery34() {
-        buildPlan("A = load 'a' as (aCol1, aCol2 : (subCol1, subCol2));");
+        buildPlan("A = load 'a' as (aCol1, aCol2 : tuple(subCol1, subCol2));");
         buildPlan("A = filter A by aCol2 == '1';");
         buildPlan("B = load 'b' as (bCol1, bCol2);");
         String query = "foreach (cogroup A by (aCol1), B by bCol1 ) generate A.aCol2, B.bCol2 ;";
@@ -668,7 +667,7 @@
     public void testQuery61() {
         buildPlan("a = load 'a' as (name, age, gpa);");
         buildPlan("b = load 'b' as (name, height);");
-        String query = "c = cross a,b;";
+        String query = "c = union a,b;";
         buildPlan(query);
     }
 
@@ -676,9 +675,9 @@
     public void testQuery62() {
         buildPlan("a = load 'a' as (name, age, gpa);");
         buildPlan("b = load 'b' as (name, height);");
-        String query = "c = cross a,b;";        
+        String query = "c = cross a,b;";
         buildPlan(query);
-        buildPlan("d = order c by a, b;");
+        buildPlan("d = order c by b::name, height, a::gpa;");
         buildPlan("e = order a by name, age, gpa desc;");
         buildPlan("f = order a by $0 asc, age, gpa desc;");
     }
@@ -708,7 +707,7 @@
 
     @Test
     public void testQuery63() {
-        buildPlan("a = load 'a' as (name, details: (age, gpa));");
+        buildPlan("a = load 'a' as (name, details: tuple(age, gpa));");
         buildPlan("b = group a by details;");
         String query = "d = foreach b generate group.age;";
         //buildPlan("b = group a by 2*3;");
@@ -717,7 +716,40 @@
 		buildPlan("e = foreach a generate name, details;");
     }
 
+    @Test
+    public void testQuery64() {
+        buildPlan("a = load 'a' as (name: chararray, details: tuple(age, gpa));");
+        buildPlan("c = load 'a' as (name, details: bag{age: integer, gpa});");
+        buildPlan("b = group a by details;");
+        String query = "d = foreach b generate group.age;";
+        buildPlan(query);
+		buildPlan("e = foreach a generate name, details;");
+		buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
+    }
+
+    @Test
+    public void testQueryFail18() {
+        String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
+        try {
+        	buildPlan(query);
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Exception"));
+        }
+    }
     
+    @Test
+    public void testQueryFail19() {
+        buildPlan("a = load 'a' as (name, age, gpa);");
+        buildPlan("b = load 'b' as (name, height);");
+        String query = "c = cross a,b;";
+        buildPlan(query);
+        try {
+        	buildPlan("d = order c by name, b::name, height, a::gpa;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Exception"));
+        }
+    }
+
     // Helper Functions
     // =================
     public LogicalPlan buildPlan(String query) {