You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/08/29 01:44:13 UTC

svn commit: r690049 - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logi...

Author: gates
Date: Thu Aug 28 16:44:13 2008
New Revision: 690049

URL: http://svn.apache.org/viewvc?rev=690049&view=rev
Log:
PIG-400 Fix issues with flatten and schema naming.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Aug 28 16:44:13 2008
@@ -72,7 +72,6 @@
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        log.info("inputsAccumulated: " + inputsAccumulated);
         if (!inputsAccumulated) {
             Result in = processInput();
             distinctBag = BagFactory.getInstance().newDistinctBag();
@@ -84,11 +83,9 @@
                     continue;
                 }
                 distinctBag.add((Tuple) in.result);
-                log.info("Added tuple" + in.result + " to the distinct bag");
                 in = processInput();
             }
             inputsAccumulated = true;
-            log.info("Distinct bag: " + distinctBag);
         }
         if (it == null) {
             it = distinctBag.iterator();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Aug 28 16:44:13 2008
@@ -25,10 +25,12 @@
 import java.util.Iterator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,6 +48,7 @@
 
     private ArrayList<LogicalPlan> mForEachPlans;
     private ArrayList<Boolean> mFlatten;
+    private ArrayList<Schema> mUserDefinedSchema = null;
     private static Log log = LogFactory.getLog(LOForEach.class);
 
     /**
@@ -65,6 +68,16 @@
         mFlatten = flattenList;
     }
 
+    public LOForEach(LogicalPlan plan, OperatorKey k,
+            ArrayList<LogicalPlan> foreachPlans, ArrayList<Boolean> flattenList,
+            ArrayList<Schema> userDefinedSchemaList) {
+
+        super(plan, k);
+        mForEachPlans = foreachPlans;
+        mFlatten = flattenList;
+        mUserDefinedSchema = userDefinedSchemaList;
+    }
+
     public ArrayList<LogicalPlan> getForEachPlans() {
         return mForEachPlans;
     }
@@ -92,6 +105,16 @@
         return DataType.BAG ;
     }
 
+    private void updateAliasCount(Map<String, Integer> aliases, String alias) {
+        if((null == aliases) || (null == alias)) return;
+		Integer count = aliases.get(alias);
+		if(null == count) {
+			aliases.put(alias, 1);
+		} else {
+			aliases.put(alias, ++count);
+		}
+    }
+
     @Override
     public Schema getSchema() throws FrontendException {
         log.debug("Entering getSchema");
@@ -124,6 +147,10 @@
                 try {
 	                planFs = ((ExpressionOperator)op).getFieldSchema();
                     log.debug("planFs: " + planFs);
+                    Schema userDefinedSchema = null;
+                    if(null != mUserDefinedSchema) {
+                        userDefinedSchema = mUserDefinedSchema.get(planCtr);
+                    }
 					if(null != planFs) {
 						String outerCanonicalAlias = op.getAlias();
 						if(null == outerCanonicalAlias) {
@@ -137,56 +164,108 @@
 							//flatten(B.(x,y,z))
 							Schema s = planFs.schema;
 							if(null != s) {
-								for(Schema.FieldSchema fs: s.getFields()) {
+								for(int i = 0; i < s.size(); ++i) {
+                                    Schema.FieldSchema fs;
+                                    try {
+                                        fs = s.getField(i);
+                                    } catch (ParseException pe) {
+                                        throw new FrontendException(pe.getMessage());
+                                    }
 									log.debug("fs: " + fs);
-									log.debug("fs.alias: " + fs.alias);
+                                    if(null != userDefinedSchema) {
+                                        Schema.FieldSchema userDefinedFieldSchema;
+                                        try {
+                                            if(i < userDefinedSchema.size()) {
+                                                userDefinedFieldSchema = userDefinedSchema.getField(i);
+                                                fs = fs.mergePrefixFieldSchema(userDefinedFieldSchema);
+                                            }
+                                        } catch (ParseException pe) {
+                                            throw new FrontendException(pe.getMessage());
+                                        } catch (SchemaMergeException sme) {
+                                            throw new FrontendException(sme.getMessage());
+                                        }
+                                        outerCanonicalAlias = null;
+                                    }
 									String innerCanonicalAlias = fs.alias;
+                                    Schema.FieldSchema newFs;
 									if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
 										String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
-										Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+										newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
 										fss.add(newFs);
-										Integer count;
-										count = aliases.get(innerCanonicalAlias);
-										if(null == count) {
-											aliases.put(innerCanonicalAlias, 1);
-										} else {
-											aliases.put(innerCanonicalAlias, ++count);
-										}
-										count = aliases.get(disambiguatorAlias);
-										if(null == count) {
-											aliases.put(disambiguatorAlias, 1);
-										} else {
-											aliases.put(disambiguatorAlias, ++count);
-										}
-										flattenAlias.put(newFs, innerCanonicalAlias);
-										inverseFlattenAlias.put(innerCanonicalAlias, true);
+                                        updateAliasCount(aliases, disambiguatorAlias);
 										//it's fine if there are duplicates
 										//we just need to record if its due to
 										//flattening
 									} else {
-										Schema.FieldSchema newFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+										newFs = new Schema.FieldSchema(fs.alias, fs.schema, fs.type);
 										fss.add(newFs);
 									}
+                                    updateAliasCount(aliases, innerCanonicalAlias);
+									flattenAlias.put(newFs, innerCanonicalAlias);
+									inverseFlattenAlias.put(innerCanonicalAlias, true);
 								}
 							} else {
-								Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-								fss.add(newFs);
+                                Schema.FieldSchema newFs;
+                                if(null != userDefinedSchema) {
+                                    if(!DataType.isSchemaType(planFs.type)) {
+                                        if(userDefinedSchema.size() > 1) {
+                                            throw new FrontendException("Schema mismatch. A basic type on flattening cannot have more than one column. User defined schema: " + userDefinedSchema);
+                                        }
+								        newFs = new Schema.FieldSchema(null, planFs.type);
+                                        try {
+                                            newFs = newFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));
+                                        } catch (SchemaMergeException sme) {
+                                            throw new FrontendException(sme.getMessage());
+                                        } catch (ParseException pe) {
+                                            throw new FrontendException(pe.getMessage());
+                                        }
+                                        updateAliasCount(aliases, newFs.alias);
+                                        fss.add(newFs);
+                                    } else {
+                                        for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) {
+                                            fss.add(new Schema.FieldSchema(ufs.alias, ufs.schema, ufs.type));
+                                            updateAliasCount(aliases, ufs.alias);
+                                        }
+                                    }
+								} else {
+                                    if(!DataType.isSchemaType(planFs.type)) {
+								        newFs = new Schema.FieldSchema(null, planFs.type);
+                                    } else {
+								        newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    }
+                                    fss.add(newFs);
+                                }
 							}
 						} else {
 							//just populate the schema with the field schema of the expression operator
+                            //check if the user has defined a schema for the operator; compare the schema
+                            //with that of the expression operator field schema and then add it to the list
+                            if(null != userDefinedSchema) {
+                                try {
+                                    planFs = planFs.mergePrefixFieldSchema(userDefinedSchema.getField(0));
+                                    updateAliasCount(aliases, planFs.alias);
+                                } catch (SchemaMergeException sme) {
+                                    throw new FrontendException(sme.getMessage());
+                                } catch (ParseException pe) {
+                                    throw new FrontendException(pe.getMessage());
+                                }
+                            }
 	                   		fss.add(planFs);
-							if(null != outerCanonicalAlias) {
-								Integer count = aliases.get(outerCanonicalAlias);
-								if(null == count) {
-									aliases.put(outerCanonicalAlias, 1);
-								} else {
-									aliases.put(outerCanonicalAlias, ++count);
-								}
-							}
 						}
 					} else {
 						//did not get a valid list of field schemas
-						fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                        String outerCanonicalAlias = null;
+                        if(null != userDefinedSchema) {
+                            try {
+                                Schema.FieldSchema userDefinedFieldSchema = userDefinedSchema.getField(0);
+                                fss.add(userDefinedFieldSchema);
+                                updateAliasCount(aliases, userDefinedFieldSchema.alias);
+                            } catch (ParseException pe) {
+                                throw new FrontendException(pe.getMessage());
+                            }
+                        } else {
+						    fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+                        }
 					}
                 } catch (FrontendException fee) {
                     mSchema = null;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Aug 28 16:44:13 2008
@@ -43,6 +43,7 @@
     //private ArrayList<ExpressionOperator> mProjections;
     private ArrayList<LogicalPlan> mGeneratePlans;
     private ArrayList<Boolean> mFlatten;
+    private ArrayList<Schema> mUserDefinedSchema = null;
     private static Log log = LogFactory.getLog(LOGenerate.class);
 
     /**
@@ -64,6 +65,16 @@
         mFlatten = flatten;
     }
 
+    public LOGenerate(LogicalPlan plan, OperatorKey key,
+            ArrayList<LogicalPlan> generatePlans, ArrayList<Boolean> flatten,
+            ArrayList<Schema> userDefinedSchemaList) {
+        super(plan, key);
+        mGeneratePlans = generatePlans;
+        mFlatten = flatten;
+        mUserDefinedSchema = userDefinedSchemaList;
+    }
+
+
     /**
      * 
      * @param plan
@@ -94,6 +105,10 @@
         return mFlatten;
     }
 
+    public List<Schema> getUserDefinedSchema() {
+        return mUserDefinedSchema;
+    }
+
     @Override
     public String name() {
         return "Generate " + mKey.scope + "-" + mKey.id;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Aug 28 16:44:13 2008
@@ -391,6 +391,26 @@
         log.trace("Exiting attachPlan");
     }
 
+    public static class SchemaUtils {
+        public static void setSchemaDefaultType(Schema s, byte t) {
+            if(null == s) return;
+            for(Schema.FieldSchema fs: s.getFields()) {
+                setFieldSchemaDefaultType(fs, t);
+            }
+        }
+    
+        public static void setFieldSchemaDefaultType(Schema.FieldSchema fs, byte t) {
+            if(null == fs) return;
+            if(DataType.NULL == fs.type) {
+                fs.type = t;
+            }
+            if(DataType.isSchemaType(fs.type)) {
+                setSchemaDefaultType(fs.schema, t);
+            }
+        }
+    }
+	
+
 }
 
 
@@ -478,7 +498,6 @@
         }
 }
 
-	
 PARSER_END(QueryParser)
 
 // Skip all the new lines, tabs and spaces
@@ -670,12 +689,12 @@
 LogicalOperator Expr(LogicalPlan lp) : 
 {
 	LogicalOperator op; 
-	Schema schema; 
+	Schema schema = null; 
 	log.trace("Entering Expr");
 }
 {
 	(
-	( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {op.setSchema(schema);} ] )
+	( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); op.setSchema(schema);} ] )
 |	op = BaseExpr(lp)
 	)
 	{log.trace("Exiting Expr"); return op;}
@@ -736,7 +755,25 @@
 	(
 	(
     (<DEFINE> op = DefineClause(lp))
-|	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema); op.setCanonicalNames(); log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
+|	(<LOAD> op = LoadClause(lp) 
+        [ <AS> 
+        (
+            LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
+            {
+                SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
+                op.setSchema(schema); 
+                op.setCanonicalNames(); 
+                log.debug("Load as schema" + schema);
+            } 
+        |   fs = AtomSchema() 
+            {
+                schema = new Schema(fs); 
+                op.setSchema(schema); 
+                log.debug("Load as atomschema" + schema);
+            }
+        ) 
+        ]
+    )
 |	((<GROUP> | <COGROUP>) op = CogroupClause(lp))
 |	(<FILTER> op = FilterClause(lp))
 |   (<LIMIT> op = LimitClause(lp))
@@ -1149,6 +1186,7 @@
 	ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); 
 	LogicalPlan groupByPlan;
 	ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+	ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
 	log.trace("Entering GroupItem");
 	log.debug("LogicalPlan: " + lp);
 }
@@ -1159,16 +1197,16 @@
 			( <BY> 
 				( 
 					LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
-					( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+					( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
 						{listPlans.add(groupByPlan);}
 						(
-							"," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+							"," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
 							{listPlans.add(groupByPlan);}
 						)*
 						")" 
 					)
 				|	(
-						es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+						es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
 						{listPlans.add(groupByPlan);}
 					)
 				)
@@ -1412,6 +1450,7 @@
         LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1);
         List<LogicalPlan> generatePlans = generate.getGeneratePlans();
         List<Boolean> flattenList = generate.getFlatten();
+        List<Schema> userDefinedSchemaList = generate.getUserDefinedSchema();
         /*
         Generate's nested plans will be translated to foreach's nested plan
         If generate contains an expression that does not require generate's
@@ -1461,7 +1500,7 @@
         }
 
 		resetGenerateState();
-		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList);
+		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList, (ArrayList) userDefinedSchemaList);
 		try {
 			lp.add(foreach);
 			log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
@@ -1802,17 +1841,18 @@
 {
 	ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>(); 
 	ArrayList<Boolean> flattenList = new ArrayList<Boolean>(); 
+	ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>(); 
 	ExpressionOperator item;
 	LogicalPlan generatePlan;
 	log.trace("Entering FlattenedGenerateItemList");
 }
 {
 	(
-        item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList)
+        item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList)
             {
                 generatePlans.add(generatePlan);
             }
-        ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList)
+        ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList)
             {
                 generatePlans.add(generatePlan);
             }
@@ -1820,7 +1860,7 @@
 
     )
 	{
-		LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+		LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList, userDefinedSchemaList);
 		lp.add(generate);
 		log.debug("Added operator " + generate.getClass().getName() + " to the logical plan");
 		log.trace("Exiting FlattenedGenerateItemList");
@@ -1829,7 +1869,7 @@
 }
 	
 
-ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList): 
+ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList, ArrayList<Schema> userDefinedSchemaList): 
 {
 	ExpressionOperator item; 
 	Schema schema = null; 
@@ -1841,13 +1881,16 @@
 {
 	(
 	(
-	(
 		<FLATTEN> "(" item = InfixExpr(over,specs,lp,input) ")" 
 		{
 			flatten = true;
 		}
+        [ <AS> "(" schema = TupleSchema() ")" ]
 	)
-|	(item = InfixExpr(over,specs,lp,input))
+|	
+    (
+    (
+    (item = InfixExpr(over,specs,lp,input))
 |	( <STAR> 
 		{
 			LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1); 
@@ -1861,15 +1904,17 @@
 			log.debug("FGItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
 		}
 	)
+    )
+        [ <AS> fs = FieldSchema() ]
 	)
-        [ <AS> (fs = FieldSchema()) ]
 	)
 	{
 		log.debug("item: " + item.getClass().getName());
         if(null != fs) {
-            item.setFieldSchema(fs);
+            schema = new Schema(fs);
         }
-		flattenList.add(flatten);
+	    flattenList.add(flatten);
+        userDefinedSchemaList.add(schema);
 		log.trace("Exiting FlattenedGenerateItem");
 		return item;
 	}
@@ -2289,7 +2334,7 @@
 Schema.FieldSchema AtomSchema() : 
 {
 	Token t1 = null;
-	byte type = DataType.BYTEARRAY;
+	byte type = DataType.NULL;
 	Schema.FieldSchema fs;
 	log.trace("Entering AtomSchema");
 }
@@ -2316,7 +2361,7 @@
 	log.trace("Entering SchemaMap");
 }
 {
-	( t1 = <IDENTIFIER> )  [":" <MAP>] "[" "]"
+	( t1 = <IDENTIFIER> )  [LOOKAHEAD(2) ":" <MAP>| ":"] "[" "]"
 	{
 		if (null != t1) {
 			log.debug("MAP alias " + t1.image);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Aug 28 16:44:13 2008
@@ -335,6 +335,83 @@
             }
         }
 
+        /***
+        * Recursively prefix merge two schemas
+        * @param other the other field schema to be merged with
+        * @return the prefix merged field schema this can be null if one schema is null and
+        *         allowIncompatibleTypes is true
+        *
+        * @throws SchemaMergeException if they cannot be merged
+        */
+
+        public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException {
+            return mergePrefixFieldSchema(otherFs, true);
+        }
+
+        /***
+        * Recursively prefix merge two schemas
+        * @param other the other field schema to be merged with
+        * @param otherTakesAliasPrecedence true if aliases from the other
+        *                                  field schema take precedence
+        * @return the prefix merged field schema this can be null if one schema is null and
+        *         allowIncompatibleTypes is true
+        *
+        * @throws SchemaMergeException if they cannot be merged
+        */
+
+        public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
+                                            boolean otherTakesAliasPrecedence)
+                                                throws SchemaMergeException {
+            Schema.FieldSchema myFs = this;
+            Schema.FieldSchema mergedFs = null;
+            byte mergedType = DataType.NULL;
+    
+            if(null == otherFs) {
+                return myFs;
+            }
+
+            if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
+                && (otherFs.type == DataType.NULL || otherFs.type == DataType.UNKNOWN)) {
+                throw new SchemaMergeException("Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+            } else if(myFs.type == otherFs.type) {
+                mergedType = myFs.type;
+            } else if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN) 
+                && (otherFs.type == DataType.NULL)) {
+                mergedType = DataType.BYTEARRAY;
+            } else if ((myFs.type != DataType.NULL && myFs.type != DataType.UNKNOWN)
+                && (otherFs.type == DataType.NULL)) {
+                mergedType = myFs.type;
+            } else {
+                throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+            }
+    
+            String mergedAlias = mergeAlias(myFs.alias,
+                                            otherFs.alias,
+                                            otherTakesAliasPrecedence) ;
+    
+            if (!DataType.isSchemaType(mergedType)) {
+                // just normal merge
+                mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+            }
+            else {
+                Schema mergedSubSchema = null;
+                // merge inner schemas because both sides have schemas
+                if(null != myFs.schema) {
+                    mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema,
+                                                     otherTakesAliasPrecedence);
+                } else {
+                    mergedSubSchema = otherFs.schema;
+                }
+                // create the merged field
+                try {
+                    mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ;
+                } catch (FrontendException fee) {
+                    throw new SchemaMergeException(fee.getMessage());
+                }
+            }
+            return mergedFs;
+        }
+
     }
 
     private List<FieldSchema> mFields;
@@ -1030,6 +1107,70 @@
         return new Schema(outerSchema);
     }
 
+    /***
+     * Recursively prefix merge two schemas
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @return the prefix merged schema this can be null if one schema is null and
+     *         allowIncompatibleTypes is true
+     *
+     * @throws SchemaMergeException if they cannot be merged
+     */
+
+    public Schema mergePrefixSchema(Schema other,
+                               boolean otherTakesAliasPrecedence)
+                                    throws SchemaMergeException {
+        Schema schema = this;
+
+        if (other == null) {
+                return this ;
+        }
+
+        if (schema.size() < other.size()) {
+            throw new SchemaMergeException("Schema size mismatch. Other schema size greater than schema size. Schema: " + this + ". Other schema: " + other);
+        }
+
+        List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+
+        List<FieldSchema> mylist = schema.mFields ;
+        List<FieldSchema> otherlist = other.mFields ;
+
+        // We iterate up to the smaller one's size
+        int iterateLimit = other.mFields.size();
+
+        int idx = 0;
+        for (; idx< iterateLimit ; idx ++) {
+
+            // Just for readability
+            FieldSchema myFs = mylist.get(idx) ;
+            FieldSchema otherFs = otherlist.get(idx) ;
+
+            FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence);
+            outputList.add(mergedFs) ;
+        }
+        // if the first schema has leftover, then append the rest
+        for(int i=idx; i < mylist.size(); i++) {
+
+            FieldSchema fs = mylist.get(i) ;
+
+            // for non-schema types
+            if (!DataType.isSchemaType(fs.type)) {
+                outputList.add(new FieldSchema(fs.alias, fs.type)) ;
+            }
+            // for TUPLE & BAG
+            else {
+                try {
+                    FieldSchema tmp = new FieldSchema(fs.alias, fs.schema, fs.type) ;
+                    outputList.add(tmp) ;
+                } catch (FrontendException fee) {
+                    throw new SchemaMergeException(fee.getMessage());
+                }
+            }
+        }
+
+        return new Schema(outputList) ;
+    }
 
 }
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Aug 28 16:44:13 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ByteArrayInputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -56,6 +57,8 @@
 import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
+import org.apache.pig.impl.logicalLayer.parser.ParseException ;
 
 
 public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -863,7 +866,7 @@
     @Test
     public void testQuery70() {
         buildPlan(" a = load 'input1';");
-        buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap;");
+        buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap:map[];");
         buildPlan(" c = foreach b generate mymap#10L;");
     }
 
@@ -1157,6 +1160,112 @@
         }
     }
 
+    @Test
+    public void testQuery90() throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+
+        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
+        buildPlan("b = group a by (name, age);");
+
+        //the first element in group, i.e., name is renamed as myname
+        lp = buildPlan("c = foreach b generate flatten(group) as (myname), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, age: int, mycount: long")));
+
+        //the first and second elements in group, i.e., name and age are renamed as myname and myage
+        lp = buildPlan("c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, myage: int, mycount: long")));
+
+        //the schema of group is unchanged
+        lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("name: chararray, age: int, mycount: long")));
+
+        //group is renamed as mygroup
+        lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+
+        //group is renamed as mygroup and the first element is renamed as myname
+        lp = buildPlan("c = foreach b generate group as mygroup:(myname), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long")));
+
+        //group is renamed as mygroup and the elements are renamed as myname and myage
+        lp = buildPlan("c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long")));
+
+        //group is renamed to mygroup as the tuple schema is empty
+        lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) as mycount;");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+        assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+
+        /*
+        //forcing an wrror by having more elements in the fhe schema
+        lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage, mygpa), COUNT(A) as mycount;");
+        lp = buildPlan("c = foreach B generate group as mygroup:(myname: int, myage), COUNT(A) as mycount;");
+        lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage: chararray), COUNT(A) as mycount;");
+        lp = buildPlan("c = foreach B generate group as mygroup:{t: (myname, myage)}, COUNT(A) as mycount;");
+        lp = buildPlan("c = foreach B generate flatten(group) as (myname, myage, mygpa), COUNT(A) as mycount;");
+
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        assertTrue(foreach.getSchema().equals(getSchemaFromString()));
+        */
+
+    }
+
+    @Test
+    public void testQueryFail90() throws FrontendException, ParseException {
+        buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);");
+        buildPlan("b = group a by (name, age);");
+
+        try {
+            buildPlan("c = foreach b generate group as mygroup:(myname, myage, mygpa), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Schema size mismatch"));
+        }
+
+        try {
+            buildPlan("c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Type mismatch"));
+        }
+
+        try {
+            buildPlan("c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Type mismatch"));
+        }
+
+        try {
+            buildPlan("c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Type mismatch"));
+        }
+
+        try {
+            buildPlan("c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Schema size mismatch"));
+        }
+    }
+
+    private Schema getSchemaFromString(String schemaString) throws ParseException {
+        return getSchemaFromString(schemaString, DataType.BYTEARRAY);
+    }
+
+    private Schema getSchemaFromString(String schemaString, byte defaultType) throws ParseException {
+        ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ;
+        QueryParser queryParser = new QueryParser(stream) ;
+        Schema schema = queryParser.TupleSchema() ;
+        QueryParser.SchemaUtils.setSchemaDefaultType(schema, defaultType);
+        return schema;
+    }
+
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Aug 28 16:44:13 2008
@@ -2418,7 +2418,7 @@
         // check outer schema
         Schema endResultSchema = foreach1.getSchema() ;
 
-        assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+        assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ;
         assertEquals(endResultSchema.getField(1).type, DataType.LONG) ;
 
     }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java Thu Aug 28 16:44:13 2008
@@ -989,7 +989,7 @@
         // check outer schema
         Schema endResultSchema = foreach1.getSchema() ;
 
-        assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ;
+        assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ;
         assertEquals(endResultSchema.getField(1).type, DataType.DOUBLE) ;
     }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java?rev=690049&r1=690048&r2=690049&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Thu Aug 28 16:44:13 2008
@@ -180,6 +180,7 @@
             Schema schema = null ;
             try {
                 schema = queryParser.TupleSchema() ;
+                QueryParser.SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
                 
                 // set all the [NoAlias] to null
                 for(int i=0; i < dummyAliasCounter; i++) {