You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 23:25:54 UTC

svn commit: r654629 [3/4] - in /incubator/pig/branches/types: ./ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logic...

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=654629&r1=654628&r2=654629&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May  8 14:25:22 2008
@@ -33,17 +33,18 @@
 import java.util.*;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.schema.*;
-import org.apache.pig.impl.eval.*;
-import org.apache.pig.impl.eval.window.*;
-import org.apache.pig.impl.eval.cond.*;
-import org.apache.pig.*;
-import org.apache.pig.data.*;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.ExecType;
 import org.apache.pig.impl.io.*;
-import org.apache.pig.builtin.*;
-import org.apache.pig.impl.builtin.*;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.builtin.GFAny;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.plan.MultiMap;
+import org.apache.pig.impl.plan.PlanException;
+
 
 public class QueryParser {
 	private PigContext pigContext;
@@ -51,32 +52,33 @@
 	private Map<OperatorKey, LogicalOperator> opTable;
 	private String scope;
 	private NodeIdGenerator nodeIdGen;
-
+	//a map of alias to logical operator for a quick lookup
+	private Map<String, LogicalOperator> mapAliasOp;
+	private static Log log = LogFactory.getLog(QueryParser.class);
+	
 	private long getNextId() {
 		return nodeIdGen.getNextNodeId(scope);
 	}
 
-	public QueryParser(InputStream in, 
+	public QueryParser(InputStream in,
 					   PigContext pigContext, 
 					   String scope, 
 					   Map<String, LogicalPlan> aliases,
-					   Map<OperatorKey, LogicalOperator> opTable) {
+					   Map<OperatorKey, LogicalOperator> opTable,
+					   Map<String, LogicalOperator> aliasOp) {
 		this(in);
 		this.pigContext = pigContext;
 		this.aliases = aliases;
 		this.opTable = opTable;
 		this.scope = scope;
 		this.nodeIdGen = NodeIdGenerator.getGenerator();
+		this.mapAliasOp = aliasOp;
 	}
-	
-	public class EvalSpecAndSchema{
-		EvalSpec spec;
-		TupleSchema schema;
-	}
-	
+
 	public class CogroupInput {
-		public OperatorKey op;
-		public EvalSpec spec;
+		public LogicalOperator op;
+		public ArrayList<LogicalPlan> plans;
+		public boolean isInner;
 	}
 	    
     private static String removeQuotes(String str) {
@@ -100,16 +102,14 @@
 
         long storeNodeId = NodeIdGenerator.getGenerator().getNextNodeId(scope);
 
-		LogicalOperator root = new LOStore(opTable,
-										   scope,
-                                           storeNodeId,
-                                           readFrom.getRoot(),
-                                           new FileSpec(fileName, func),
-                                           false);
+		LogicalOperator root = new LOStore(readFrom,
+										   new OperatorKey(scope, storeNodeId),
+                                           new FileSpec(fileName, func));
+                                         
                                            
-        LogicalPlan storePlan = new LogicalPlan(root.getOperatorKey(), opTable, pigContext);
-
-        return storePlan;
+        readFrom.add(root);
+				
+        return readFrom;
     }
 
     static String unquote(String s) {
@@ -120,108 +120,136 @@
 		return Integer.parseInt(s.substring(1, s.length()));	
 	}
 	
-	
-	
+
 	String massageFilename(String filename, PigContext pigContext) throws IOException, ParseException {
 		if (pigContext.getExecType() != ExecType.LOCAL) {
 			if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
 					filename = FileLocalizer.hadoopify(filename, pigContext);
 			} 
-			else
-			{
-				// make sure that dfs file exists
-				if (!FileLocalizer.fileExists(filename, pigContext))
-				{
-					throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
-				}
-			}
+			// Removed check for file existence, that will be handled later by InputOutputFileValidator
 		}
 		return filename;
 	}
 	
-	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis) throws ParseException{
+	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+		
+		log.trace("Entering parseCogroup");
+		log.debug("LogicalPlan: " + lp);
+		
 		int n = gis.size();
+		log.debug("Number of cogroup inputs = " + n);
+		
+		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+		MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+		//Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
+		boolean[] isInner = new boolean[n];
 		
-		List<OperatorKey> los = new ArrayList<OperatorKey>();
-		ArrayList<EvalSpec> specs = new ArrayList<EvalSpec>();
+		int arity = gis.get(0).plans.size();
 		
 		for (int i = 0; i < n ; i++){
 			
 			CogroupInput gi = gis.get(i);
 			los.add(gi.op);
-			specs.add(gi.spec);
+			ArrayList<LogicalPlan> planList = gi.plans;
+			plans.add(gi.plans);
+			int numGrpByOps = planList.size();
+			log.debug("Number of group by operators = " + numGrpByOps);
+
+			if(arity != numGrpByOps) {
+				throw new ParseException("The arity of the group by columns do not match.");
+			}
+			for(int j = 0; j < numGrpByOps; ++j) {
+			    groupByPlans.put(gi.op, planList.get(j));
+				for(LogicalOperator root: planList.get(j).getRoots()) {
+					log.debug("Cogroup input plan root: " + root);
+				}
+			}
+			isInner[i] = gi.isInner;
 		}
 		
-		return new LOCogroup(opTable, scope, getNextId(), los, specs);
+		LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), los, groupByPlans, isInner);
+		lp.add(cogroup);
+		log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
+		
+		for(LogicalOperator op: los) {
+			lp.connect(op, cogroup);
+			log.debug("Connected operator " + op.getClass().getName() + " to " + cogroup.getClass().getName() + " in the logical plan");
+		}
+
+		log.trace("Exiting parseCogroup");
+		return cogroup;
 	}
 			
-	
-	LogicalOperator rewriteCross(ArrayList<OperatorKey> inputs) throws IOException, ParseException{
-		ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
-		int n = inputs.size();
+	/**
+	 * The join operator is translated to foreach 
+	 */
+	LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
 		
-		for (int i=0; i< n; i++){
-			CogroupInput gi = new CogroupInput();
-			gis.add(gi);
-
-			gi.op = inputs.get(i);
-
-			ArrayList<EvalSpec> argsColumns = new ArrayList<EvalSpec>();
-			argsColumns.add(new ConstSpec(n+""));
-			argsColumns.add(new ConstSpec(i+""));
-			GenerateSpec args = new GenerateSpec(argsColumns);
-			FuncEvalSpec fes = new FuncEvalSpec(pigContext, GFCross.class.getName(), args);
-			fes.setFlatten(true);
-			gi.spec = new GenerateSpec(fes).getGroupBySpec();
+		log.trace("Entering rewriteJoin");
+		log.debug("LogicalPlan: " + lp);
+		int n = gis.size();
+		ArrayList<ExpressionOperator> flattenedColumns = new ArrayList<ExpressionOperator>();
+		ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
+		ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+		
+		/*
+		 * Construct the projection operators required for the generate
+		 * Make sure that the operators are flattened
+		 */
+
+
+		LogicalPlan foreachPlan = new LogicalPlan();
+		for (int i = 0; i < n; i++) {
+			LogicalPlan projectPlan = new LogicalPlan(); 
+			LogicalOperator projectInput = gis.get(i).op;
+			ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, getNextId()), projectInput, i+1);
+			((LOProject)column).setStar(true);
+			flattenList.add(true);
+			flattenedColumns.add(column);
+			(gis.get(i)).isInner = true;
+			projectPlan.add(column);
+			log.debug("parseCogroup: Added operator " + column.getClass().getName() + " " + column + " to logical plan " + projectPlan);
+			generatePlans.add(projectPlan);
 		}
-		
-		return rewriteJoin(gis); 
-	}
 
-	LogicalOperator rewriteDistinct(OperatorKey input){
-		//First group the input on *
 		
-		ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();
-		inputs.add(input);
+		//Construct the cogroup operator and add it to the logical plan
 		
-		ArrayList<EvalSpec> groupSpecs = new ArrayList<EvalSpec>();
-			
-		groupSpecs.add(new GenerateSpec(new StarSpec()).getGroupBySpec());
+		LogicalOperator cogroup = parseCogroup(gis, lp);
+		lp.add(cogroup);
+		log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
 		
-		LogicalOperator groupedInput = new LOCogroup(opTable, scope, getNextId(), inputs, groupSpecs);
-		
-		//then generate the flattened group
-		EvalSpec projectSpec = new ProjectSpec(0);
-		projectSpec.setFlatten(true);
-		
-		return new LOEval(opTable, scope, getNextId(), groupedInput.getOperatorKey()	, new GenerateSpec(projectSpec));
-	}	
-		
-		
-	
-	LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis) throws IOException, ParseException{
 		
-		int n = gis.size();
-		ArrayList<EvalSpec> flattenedColumns = new ArrayList<EvalSpec>();
+		//Construct the generate operator from the list of projection plans
+		//Add the generate operator to the foreach logical plan
+		LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+		foreachPlan.add(generate);
+		log.debug("Added operator " + generate.getClass().getName() + " to the logical plan " + lp);
+
 		
-		for (int i = 0; i < n; i++) {		
-			EvalSpec column = new ProjectSpec(i+1);
-			column.setFlatten(true);
-			flattenedColumns.add(column);
-		}
+		/*
+		 * Construct the foreach operator from the foreach logical plan
+		 * Add the foreach operator to the top level logical plan
+		 */
+		 
+		LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+		lp.add(foreach);
+		log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
+		lp.connect(cogroup, foreach);
+		log.debug("Connected operator " + cogroup.getClass().getName() + " to opeator " + foreach.getClass().getName() + " in the logical plan " + lp);
 		
-		return new LOEval(opTable, scope, getNextId(), parseCogroup(gis).getOperatorKey(),new GenerateSpec(flattenedColumns));
+		log.trace("Exiting rewriteJoin");
+		return foreach;
 	}
-		
-	void assertAtomic(EvalSpec spec, boolean desiredAtomic) throws ParseException{
+
+	void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
 		Boolean isAtomic = null;
-		if (spec instanceof CompositeEvalSpec)
-			spec = ((CompositeEvalSpec)spec).getSpecs().get(0);
-		if ( spec instanceof ConstSpec || 
-			(spec instanceof FuncEvalSpec &&
-                DataType.isAtomic(DataType.findType(((FuncEvalSpec)spec).getReturnType()))))
+		if ( spec instanceof LOConst || 
+			(spec instanceof LOUserFunc &&
+                DataType.isAtomic(DataType.findType(((LOUserFunc)spec).getType()))))
 			isAtomic = true;
-		else if (spec instanceof FuncEvalSpec)
+		else if (spec instanceof LOUserFunc)
 			isAtomic = false;
 		
 		if (isAtomic != null && isAtomic != desiredAtomic){
@@ -232,17 +260,65 @@
 		}
 	}					
 
-	EvalSpec copyItemAndAddSpec(EvalSpec spec, EvalSpec successor) throws ParseException{
-		assertAtomic(spec,false);
-		spec = spec.copy(pigContext);
-		return spec.addSpec(successor);
-	}
-	
-	 void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, Cond cond, int index){
-		splitOp.addCond(cond);
-		LOSplitOutput splitOut = new LOSplitOutput(opTable, scope, getNextId(), lp.getRoot(), index);
-		aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
+	 void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
+		LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index);
+		splitOp.addOutput(splitOutput);
+		splitOp.addOutputAlias(alias, condPlan);
+		addAlias(alias, splitOutput);
+		
+		lp.add(splitOutput);
+		log.debug("Added alias: " + splitOutput.getAlias() + " class: " 
+			+ splitOutput.getClass().getName() + " to the logical plan");
+			
+		lp.connect(splitOp, splitOutput);
+		log.debug("Connected " + splitOp.getClass().getName() + " to class: "
+			+ splitOutput.getClass().getName() + " in the logical plan");
+		
+	 }
+	 
+	 void addAlias(String alias, LogicalOperator lOp) {
+	 	mapAliasOp.put(alias, lOp);
 	 }
+	 
+	 LogicalOperator getOp(String alias) {
+	 	return mapAliasOp.get(alias);
+	 }
+	 
+	 //BEGIN
+	 //I am maintaining state about the operators that should
+	 //serve as the inputs to generate in the foreach logical
+	 //plan. I did not want to pass this structure around for
+	 //the entire parse tree
+
+	 private boolean insideGenerate = false; //to check if we are parsing inside a generate statement
+	 private List<LogicalOperator> generateInputs = new ArrayList<LogicalOperator>();
+
+	boolean insideGenerate() {
+		return insideGenerate;
+	}
+
+	void setInsideGenerate(boolean b) {
+		insideGenerate = b;
+	}
+
+	List<LogicalOperator> getGenerateInputs() {
+	 	return generateInputs;
+	}
+
+	void resetGenerateInputs() {
+		generateInputs.clear();
+	}
+
+	void addGenerateInput(LogicalOperator op) {
+		generateInputs.add(op);
+	}
+
+	void resetGenerateState() {
+		insideGenerate = false;
+		resetGenerateInputs();
+	}
+
+	 //END
 
 }
 
@@ -290,15 +366,11 @@
 TOKEN : { <AND : "and"> }
 TOKEN : { <OR : "or"> }
 TOKEN : { <NOT : "not"> }
-TOKEN : { <CONTINUOUSLY : "continuously"> }
-TOKEN : { <WINDOW : "window"> }
-TOKEN : { <SECONDS : "seconds"> }
-TOKEN : { <MINUTES : "minutes"> }
-TOKEN : { <HOURS : "hours"> }
-TOKEN : { <TUPLES : "tuples"> }
 TOKEN : { <GENERATE : "generate"> }
 TOKEN : { <FLATTEN : "flatten"> }
 TOKEN : { <EVAL : "eval"> }
+TOKEN : { <ASC : "asc"> }
+TOKEN : { <DESC : "desc"> }
 
 TOKEN:
 {
@@ -321,129 +393,195 @@
 TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
 
 // Parse is the Starting function.
-LogicalPlan Parse() : {LogicalOperator root; Token t1;}
+LogicalPlan Parse() : 
+{
+	LogicalOperator root; 
+	Token t1; 
+	LogicalPlan lp = new LogicalPlan();
+	log.trace("Entering Parse");
+}
 {
 	(
 	LOOKAHEAD(2)
-	(t1 = <IDENTIFIER> "=" root = Expr() ";" {root.setAlias(t1.image);})
-|	(root = Expr() ";")
-|	(<SPLIT> root = SplitClause() ";")
+	(t1 = <IDENTIFIER> "=" root = Expr(lp) ";" {root.setAlias(t1.image); addAlias(t1.image, root);})
+|	(root = Expr(lp) ";")
+|	(<SPLIT> root = SplitClause(lp) ";")
 	)
-	{ return new LogicalPlan(root.getOperatorKey(), opTable, pigContext); }
+	{ 
+		if(null != root.getSchema()) {
+			log.debug("Root: " + root.getClass().getName() + " schema aliases");
+			root.getSchema().printAliases();
+		}
+		
+		log.trace("Exiting Parse");
+		return lp; 
+	}
 }
 
-LogicalOperator SplitClause():
-{LogicalOperator input; Cond cond; Token alias; LOSplit splitOp; LogicalPlan lp; int i=0;}
+LogicalOperator SplitClause(LogicalPlan lp):
+{
+	LogicalOperator input; 
+	ExpressionOperator cond; 
+	Token alias; 
+	LOSplit splitOp; 
+	int index = 0; 
+	LogicalPlan condPlan; 
+	log.trace("Entering SplitClause");
+}
 {
 	(
-	input = NestedExpr() <INTO> 
+	input = NestedExpr(lp) <INTO> 
 	{
-		splitOp = new LOSplit(opTable, scope, getNextId(), input.getOperatorKey());
-		lp = new LogicalPlan(splitOp.getOperatorKey(), opTable, pigContext);
+		splitOp = new LOSplit(lp, input.getOperatorKey(), new ArrayList<LogicalOperator>(), new HashMap<String,LogicalPlan>());
+		lp.add(splitOp);
+		log.debug("Adding operator " + splitOp.getClass().getName() + " to the logical plan");		
 	}
-	alias = <IDENTIFIER> <IF> cond = PCond(input.outputSchema(),null) 
+	alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input) 
 	{
-		addSplitOutput(lp, splitOp, alias.image, cond, i);
-		i++;
+		addSplitOutput(lp, splitOp, alias.image, condPlan, index);
+		++index;
+		log.debug("Added splitoutput");
 	}
 	(
-	"," alias = <IDENTIFIER> <IF> cond = PCond(input.outputSchema(),null)
+	"," alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
 	{
-		addSplitOutput(lp, splitOp, alias.image, cond, i);
-		i++;
+		addSplitOutput(lp, splitOp, alias.image, condPlan, index);
+		++index;
+		log.debug("Added splitoutput");
 	}
 	)+
 	)
-	{return splitOp;}
+	{log.trace("Exiting SplitClause"); return splitOp;}
 } 
 
 
-LogicalOperator Expr() : {LogicalOperator op; TupleSchema schema;}
+LogicalOperator Expr(LogicalPlan lp) : 
+{
+	LogicalOperator op; 
+	Schema schema; 
+	log.trace("Entering Expr");
+}
 {
 	(
-	( op = NestedExpr() [ <AS> schema = SchemaTuple() {op.setSchema(schema);} ] )
-|	op = BaseExpr()
+	( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {op.setSchema(schema);} ] )
+|	op = BaseExpr(lp)
 	)
-	{return op;}
+	{log.trace("Exiting Expr"); return op;}
 }	
 
-LogicalOperator NestedExpr() : {LogicalOperator op;}
+LogicalOperator NestedExpr(LogicalPlan lp) : 
+{
+	LogicalOperator op; 
+	ExpressionOperator eOp;
+	Map<String, LogicalOperator> specs = null; 
+	log.trace("Entering NestedExpr");
+}
 {
 	(
-	(op = Alias())
-|	LOOKAHEAD(2) ( "(" op = NestedExpr() ")" )
-|	( "(" op = BaseExpr() ")" )
+	(op = Alias(lp))
+|	LOOKAHEAD(2) ( "(" op = NestedExpr(lp) ")" )
+|	( "(" op = BaseExpr(lp) ")" )
 	)
-	{return op;}
+	{log.trace("Exiting NestedExpr"); return op;}
 }
 
 // A reference to an alias
-LogicalOperator Alias() : {Token t1; LogicalOperator op;}
+LogicalOperator Alias(LogicalPlan lp) : 
+{
+	Token t1; 
+	LogicalOperator op; 
+	log.trace("Entering Alias");
+}
 {
 	t1 = <IDENTIFIER> 
 	{
 		LogicalOperator aliasOp;
 		String alias = t1.image;
 		
-		aliasOp = opTable.get(aliases.get(alias).getRoot());
-		
+		aliasOp = getOp(alias);
 		if (aliasOp == null) {
 			throw new ParseException("Unrecognized alias " + alias);
 		}
+		addAlias(alias, aliasOp);
+		log.debug("Added " + alias + " to aliasOp");
 		
+		lp.add(aliasOp);
+		log.debug("Added operator: " + aliasOp.getClass().getName() + " to the logical plan " + lp);
+		log.trace("Exiting Alias");
 		return aliasOp;
 	}
 }
 
-
-
-	
-	
-LogicalOperator BaseExpr() : {LogicalOperator op; TupleSchema schema; Token t1, t2;}
+LogicalOperator BaseExpr(LogicalPlan lp) : 
+{
+	LogicalOperator op; 
+	Schema schema; 
+	Token t1, t2; 
+	Schema.FieldSchema fs; 
+	log.trace("Entering BaseExpr");
+}
 {
 	(
 	(
-	(<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
-|	((<GROUP> | <COGROUP>) op = CogroupClause())
-|	(<FILTER> op = FilterClause())
-|   (<ORDER> op = OrderClause())
-|	(<DISTINCT> op = NestedExpr() {op = rewriteDistinct(op.getOperatorKey());})
-|	(<CROSS> op = CrossClause())
-|   (<JOIN> op = JoinClause())
-|	(<UNION> op = UnionClause())
-|	(<FOREACH> op = ForEachClause())
+	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
+|	((<GROUP> | <COGROUP>) op = CogroupClause(lp))
+|	(<FILTER> op = FilterClause(lp))
+|   (<ORDER> op = OrderClause(lp))
+|	(<DISTINCT> op = NestedExpr(lp) 
+	{
+		LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()), op); 
+		lp.add(distinct);
+		log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan"); 
+		lp.connect(op, distinct);
+		log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
+	})
+|	(<CROSS> op = CrossClause(lp))
+|   (<JOIN> op = JoinClause(lp))
+|	(<UNION> op = UnionClause(lp))
+|	(<FOREACH> op = ForEachClause(lp))
 	)
     [<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
 	)	
-	{return op;}
+	{log.trace("Exiting BaseExpr"); return op;}
 }
 
-LogicalOperator LoadClause() : {Token t1, t2; String filename; String funcName,funcArgs, funcSpec=null; 
-								LOLoad lo=null; boolean continuous=false;}
+LogicalOperator LoadClause(LogicalPlan lp) : 
+{
+	Token t1, t2; 
+	String filename; 
+	String funcName,funcArgs, funcSpec=null; 
+	LOLoad lo=null; 
+	log.trace("Entering LoadClause");
+}
 {
 	(	filename = FileName()
 		(
 		<USING>  funcName = QualifiedFunction() "(" funcArgs = StringList() ")"
 		{
 			funcSpec = funcName + "(" + funcArgs + ")";
+			log.debug("LoadClause: funcSpec = + funcSpec");
 		}
 		)?
 	)
-	[ <CONTINUOUSLY> {continuous=true;} ] 
 	{
 		if (funcSpec == null){
-			funcSpec = PigStorage.class.getName();
-			funcSpec += continuous ? "('\t','\n','0')" : "()";
+			funcSpec = PigStorage.class.getName() + "()";
 		}
 		 
-		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext), funcSpec));	
-		if (continuous)
-			lo.setOutputType(LogicalOperator.MONOTONE);
+		lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null);
+		lp.add(lo);
+		log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");	
+		
+		log.trace("Exiting LoadClause");
 		return lo;
 	} 
 }    
 
-String StringList() : {StringBuilder sb = new StringBuilder(); Token t;}
+String StringList() : 
+{
+	StringBuilder sb = new StringBuilder(); 
+	Token t;
+}
 {
 	(
 	(
@@ -452,563 +590,1201 @@
 	)
 	| {}
 	)
-	{return sb.toString();}
+	{log.debug("StringList: " + sb.toString()); return sb.toString();}
 }
 
-String FileName(): {Token t;}
+String FileName(): 
+{
+	Token t;
+}
 {
 	t = <QUOTEDSTRING> 
-	{return unquote(t.image);}
+	{log.debug("FileName: " + unquote(t.image)); return unquote(t.image);}
 }
 
-LogicalOperator FilterClause():
-{Cond cond; LogicalOperator input;}
+LogicalOperator FilterClause(LogicalPlan lp):
+{
+	ExpressionOperator cond; LogicalOperator input; 
+	LogicalPlan conditionPlan = new LogicalPlan();
+	log.trace("Entering FilterClause");
+}
 {
-	input = NestedExpr()	
-	 <BY> cond = PCond(input.outputSchema(),null)
+	(
+	input = NestedExpr(lp) {log.debug("Filter input: " + input);}	
+	 <BY> cond = PCond(input.getSchema(),null,conditionPlan,input)
+	 )
 	{
-		return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new FilterSpec(cond));
+		LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, input);
+		addAlias(input.getAlias(), input);
+		lp.add(filter);
+		log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
+		
+		lp.connect(input, filter);
+		log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
+
+		log.trace("Exiting FilterClause");
+		return filter;
 	}
 }
 
 
-Cond PCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null;}
+ExpressionOperator PCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator cond = null; 
+	log.trace("Entering PCond"); 
+	log.debug("PCond Input: " + input);
+}
 {
-	cond = POrCond(over,specs)
-	{return cond;}
+	cond = POrCond(over,specs,lp, input)
+	{log.trace("Exiting PCond"); return cond;}
 }
 
-Cond POrCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond; List<Cond> cList = new ArrayList<Cond>();}
+ExpressionOperator POrCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator lhsCond, rhsCond; 
+	log.trace("Entering POrCond"); 
+	log.debug("POrCond Input: " + input);
+}
 {
 	(
-	cond = PAndCond(over,specs) {cList.add(cond);}
-	( <OR> cond = PAndCond(over,specs) {cList.add(cond);})* 
+	lhsCond = PAndCond(over,specs,lp,input)
+	(
+		<OR> rhsCond = PAndCond(over,specs,lp,input)
+		{
+			ExpressionOperator exprOp = new LOOr(lp, new OperatorKey(scope, getNextId()), lhsCond, rhsCond);
+			lp.add(exprOp);
+			log.debug("POrCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+			lp.connect(lhsCond, exprOp);
+			log.debug("POrCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
+			lp.connect(rhsCond, exprOp);
+			log.debug("POrCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
+			lhsCond = exprOp;
+		}
+	)* 
 	)
 	{
-		if (cList.size()==1)
-			return cond;
-		else	
-			return new OrCond(cList);
+			log.trace("Exiting POrCond");
+			return lhsCond;
 	}
 }
 	
-Cond PAndCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null; List<Cond> cList = new ArrayList<Cond>();}
+ExpressionOperator PAndCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator lhsCond, rhsCond; 
+	log.trace("Entering PAndCond"); 
+	log.debug("PAndCond Input: " + input);
+}
 {
 	(
-	cond = PUnaryCond(over,specs) {cList.add(cond);}
-	( <AND> cond = PUnaryCond(over,specs) {cList.add(cond);} )*
+	lhsCond = PUnaryCond(over,specs,lp,input) 
+	(
+		<AND> rhsCond = PUnaryCond(over,specs,lp,input)
+		{
+			ExpressionOperator exprOp = new LOAnd(lp, new OperatorKey(scope, getNextId()), lhsCond, rhsCond);
+			lp.add(exprOp);
+			log.debug("PAndCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+			lp.connect(lhsCond, exprOp);
+			log.debug("PAndCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
+			lp.connect(rhsCond, exprOp);
+			log.debug("PAndCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
+			lhsCond = exprOp;
+		}
+	)*
 	)
 	{
-		if (cList.size()==1)
-			return cond;
-		else	
-			return new AndCond(cList);
+			log.trace("Exiting PAndCond");
+			return lhsCond;
 	}	
 }
 
-Cond PUnaryCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null; EvalSpec c1, c2; Token t1; String funcName; GenerateSpec args;}
+ExpressionOperator PUnaryCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator cond = null; 
+	ExpressionOperator lhs, rhs; 
+	Token t1; 
+	String funcName; 
+	List<ExpressionOperator> args;
+	log.trace("Entering PUnaryCond");
+}
 {
 	(
-	LOOKAHEAD("(" PCond(over,specs) ")")
-	("(" cond = PCond(over,specs) ")")
-|	LOOKAHEAD(InfixExpr(over,specs) <FILTEROP>) 
-		(c1=InfixExpr(over,specs) t1=<FILTEROP> c2=InfixExpr(over,specs) {cond = new CompCond(c1, t1.image, c2);})
-|	LOOKAHEAD(InfixExpr(over,specs) <MATCHES>) 
-		(c1=InfixExpr(over,specs) <MATCHES> t1=<QUOTEDSTRING> {cond = new RegexpCond(c1, unquote(t1.image));})
-|	LOOKAHEAD(FilterFunction() "(") (funcName=FilterFunction() "(" args=EvalArgs(over,specs) ")" {cond = new FuncCond(pigContext, funcName, args);})
-|	cond = PNotCond(over,specs)
+	LOOKAHEAD("(" PCond(over,specs,lp,input) ")")
+	("(" cond = PCond(over,specs,lp,input) ")")
+|	LOOKAHEAD(InfixExpr(over,specs,lp,input) <FILTEROP>) 
+	(lhs=InfixExpr(over,specs,lp,input) t1=<FILTEROP> rhs=InfixExpr(over,specs,lp,input) 
+	{
+		//the long switch case to instantiate the right operator
+		//I have the long switch case from CompCond
+		String op = t1.image;
+		op = op.toLowerCase();
+		
+		char op1 = op.charAt(0);
+        char op2 = op.length() >= 2 ? op.charAt(1) : '0';
+        char op3 = op.length() == 3 ? op.charAt(2) : '0';
+        
+        switch (op1) {
+            // numeric ops first
+        case '=':
+            if (op2 == '=') {
+                cond = new LOEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                throw new ParseException("Internal error: Invalid filter operator: " + op);
+            }
+            break;
+        case '<':
+            if (op2 == '=') {
+                cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            }
+            break;
+        case '>':
+            if (op2 == '=') {
+                cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            }
+            break;
+        case '!':
+            if (op2 == '=') {
+                cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                throw new ParseException("Internal error: Invalid filter operator: " + op);
+            }
+            break;
+            // now string ops
+        case 'e':
+            if (op2 == 'q') {
+                cond = new LOEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                throw new ParseException("Internal error: Invalid filter operator: " + op);
+            }
+            break;
+        case 'l':
+            if (op2 == 't' && op3 == 'e') {
+                cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            }
+            break;
+        case 'g':
+            if (op2 == 't' && op3 == 'e') {
+                cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            }
+            break;
+        case 'n':
+            if (op2 == 'e' && op3 == 'q') {
+                cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+            } else {
+                throw new ParseException("Internal error: Invalid filter operator: " + op);
+            }
+            break;
+        default:
+            throw new ParseException("Internal error: Invalid filter operator: " + op);
+        }
+        
+        lp.add(cond);
+		log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+		lp.connect(lhs, cond);
+		log.debug("PUnaryCond: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + cond + " logical plan " + lp);
+		lp.connect(rhs, cond);
+		log.debug("PUnaryCond: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + cond + " logical plan " + lp);
+	}
+	)
+|	LOOKAHEAD(InfixExpr(over,specs,lp,input) <MATCHES>) 
+		(lhs=InfixExpr(over,specs,lp,input) <MATCHES> t1=<QUOTEDSTRING> 
+			{
+				cond = new LORegexp(lp, new OperatorKey(scope, getNextId()), lhs, unquote(t1.image)); 
+				lp.add(cond); 
+				log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+				lp.connect(lhs, cond);
+				log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
+			}
+		)
+|	LOOKAHEAD(EvalFunction() "(") 
+		(funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")" 
+			{
+				cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BOOLEAN);
+				lp.add(cond);
+				log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+				for(ExpressionOperator exprOp: args) {
+					lp.connect(exprOp, cond);
+					log.debug("PUnaryCond: Added operator " + exprOp.getClass().getName() + " " + cond + " to logical plan " + lp);
+				}
+			}
+		)
+|	cond = PNotCond(over,specs,lp,input)
 
 	)
-	{return cond;}
+	{log.trace("Exiting PUnaryCond"); return cond;}
 }
 
-Cond PNotCond(Schema over, Map<String, EvalSpec> specs) : {Cond c1;}
+ExpressionOperator PNotCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) : 
 {
-	<NOT> c1=PUnaryCond(over,specs)
-	{return new NotCond(c1);}
+	ExpressionOperator c1;
+	log.trace("Entering PNotCond");
+}
+{
+	<NOT> c1=PUnaryCond(over,specs,lp,input)
+	{
+		ExpressionOperator eOp = new LONot(lp, new OperatorKey(scope, getNextId()), c1);
+		lp.add(eOp);
+		log.debug("PNotCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
+		lp.connect(c1, eOp);
+		log.debug("PNotCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
+		log.trace("Exiting PNotCond");
+		return eOp;
+	}
 }
 
 
 
-LogicalOperator CogroupClause() : {CogroupInput gi; ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();}
+LogicalOperator CogroupClause(LogicalPlan lp) : 
 {
-	(gi = GroupItem() { gis.add(gi); }
-	("," gi = GroupItem() { gis.add(gi); })*)
-	{return parseCogroup(gis);}
+	CogroupInput gi; 
+	ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
+	LogicalOperator cogroup; 
+	log.trace("Entering CoGroupClause");
 }
+{
 
+	(gi = GroupItem(lp) { gis.add(gi); }
+	("," gi = GroupItem(lp) { gis.add(gi); })*)
+	{
+		cogroup = parseCogroup(gis, lp);
+		log.trace("Exiting CoGroupClause");
+		return cogroup;		
+	}
 
-CogroupInput GroupItem() : {LogicalOperator op; GenerateSpec gs; EvalSpec es; LogicalOperator cgOp; EvalSpec cgSpec;}
+}
+
+CogroupInput GroupItem(LogicalPlan lp) : 
+{
+	ExpressionOperator es; 
+	LogicalOperator cgOp; 
+	boolean isInner = false; 
+	ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); 
+	LogicalPlan groupByPlan;
+	ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+	log.trace("Entering GroupItem");
+	log.debug("LogicalPlan: " + lp);
+}
 {
 	(
-	cgOp = NestedExpr()
-	(
-	( <BY> 	
-	( 
-	LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.outputSchema()) ")" )
-	( "(" gs = FlattenedGenerateItemList(cgOp.outputSchema(), null) ")" )
-|	(es = FlattenedGenerateItem(cgOp.outputSchema(), null) {gs = new GenerateSpec(es);})
-	)
-	)	
-|	<ALL> {gs = new GenerateSpec(new ConstSpec("all"));}
-|	<ANY> {gs = new GenerateSpec(new FuncEvalSpec(pigContext, GFAny.class.getName(), null));}
-	)
-	{ 
-		cgSpec = gs.getGroupBySpec();
-	}
-	[<INNER> {cgSpec.setInner(true);} | <OUTER>]
+		cgOp = NestedExpr(lp)
+		(
+			( <BY> 
+				( 
+					LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
+					( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+						{listPlans.add(groupByPlan);}
+						(
+							"," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+							{listPlans.add(groupByPlan);}
+						)*
+						")" 
+					)
+				|	(
+						es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) 
+						{listPlans.add(groupByPlan);}
+					)
+				)
+			)	
+		|	<ALL> {
+					es = new LOConst(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), "all"); 
+					groupByPlan.add(es);
+					log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
+					listPlans.add(groupByPlan);
+			}
+		|	<ANY> {
+					es = new LOUserFunc(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), GFAny.class.getName(), null, DataType.INTEGER); 
+					groupByPlan.add(es);
+					log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
+					listPlans.add(groupByPlan);
+			}
+		)
+		[<INNER> {isInner = true;} | <OUTER>]
 	)
 	{
 		CogroupInput cogroupInput = new CogroupInput(); 
 
-		cogroupInput.spec = cgSpec;
-		cogroupInput.op = cgOp.getOperatorKey();
+		cogroupInput.plans = listPlans;
+		cogroupInput.op = cgOp;
+		cogroupInput.isInner = isInner;
 		
+		log.trace("Exiting GroupItem");		
 		return cogroupInput;
     }
 }
 
-
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;}
+LogicalOperator OrderClause(LogicalPlan lp) : 
+{
+	LogicalOperator op; 
+	ExpressionOperator col; 
+	boolean star = false; 
+	ArrayList<ExpressionOperator> sortCols = new ArrayList<ExpressionOperator>(); 
+	ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>(); 
+	ArrayList<Boolean> ascOrder = new ArrayList<Boolean>(); 
+	boolean asc = true; 
+	String funcName = null; 
+	log.trace("Entering OrderClause");
+}
 {
 	(
-	op = NestedExpr() <BY> 
+	op = NestedExpr(lp) <BY> 
 	(
 	    ( 
-		(projSpec = SimpleProj(op.outputSchema()) |	( "(" projSpec = SimpleProj(op.outputSchema()) ")"))
-		{
-			projSpec.setWrapInTuple(true);
-			projSpec.setFlatten(true);
-			sortSpec = new GenerateSpec(projSpec);
-		}
+		(
+		col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans) 
+		("," col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans))*		
 		)
-	|	(sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
+	)
+	|	<STAR> {star = true;} [<ASC> | <DESC> {asc = false;}] 
+		{
+			if(asc) {
+				ascOrder.add(true);
+			} else {	
+				ascOrder.add(false);
+			}
+		}	
 	)
     (
         <USING>  funcName = QualifiedFunction()
-        {
-            try {
-                sortSpec.setComparatorName(funcName);
-            } catch (Exception e){
-                throw new ParseException(e.getMessage());
-            }
-        }
     )?
 
 	)
 	{
-		return new LOSort(opTable, scope, getNextId(), op.getOperatorKey(), sortSpec);
+		LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), op, sortColPlans, ascOrder, funcName);
+		sort.setStar(star);
+		lp.add(sort);
+		log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
+		
+		lp.connect(op, sort);
+		log.debug("Connecting sort input alias " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + sort.getClass().getName() + " in the logical plan");
+		
+		log.trace("Exiting OrderClause");
+		return sort;		
 	}
 }
-	
 
-LogicalOperator CrossClause() : {LogicalOperator op; ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();}
+
+ExpressionOperator SortCol(Schema over, LogicalPlan lp, LogicalOperator op, ArrayList<Boolean> ascOrder, ArrayList<LogicalPlan> sortColPlans) : 
+{
+	ExpressionOperator col; 
+	boolean asc = true; 
+	LogicalPlan sortColPlan = new LogicalPlan(); 
+	log.trace("Entering SortCol");}
+{
+	(
+		col = ColOrSpec(op.getSchema(), null, sortColPlan, op) [<ASC> | <DESC> {asc = false;}]
+		{
+			if(asc) {
+				log.debug("Ascending");
+				ascOrder.add(true);
+			} else {
+				log.debug("Descending");	
+				ascOrder.add(false);
+			}
+			sortColPlans.add(sortColPlan);
+		}
+		|
+		( 
+			"(" col = ColOrSpec(op.getSchema(), null, sortColPlan, op) ")" [<ASC> | <DESC> {asc = false;}]
+			{
+				if(asc) {
+					log.debug("Ascending");
+					ascOrder.add(true);
+				} else {
+					log.debug("Descending");	
+					ascOrder.add(false);
+				}
+				sortColPlans.add(sortColPlan);
+			}
+		)
+	)
+	{
+		log.trace("Exiting SortCol");
+		return col;
+	}	
+}
+
+int ColName(Schema over) : 
+{
+	Token t; 
+	log.trace("Entering ColName");
+}
 {
 	(
-	op = NestedExpr() { inputs.add(op.getOperatorKey()); }
-	("," op = NestedExpr() { inputs.add(op.getOperatorKey()); })*
+	t = <DOLLARVAR> {return undollar(t.image);}
+	|
+	t = <IDENTIFIER> 
+	{	int i;
+		
+		if ( over == null ||  (i = over.getPosition(t.image)) == -1) {
+			throw new ParseException("Invalid alias: " + t.image + " in " + over);
+		} 
+		
+		log.trace("Exiting ColName");
+		return i;
+	}
 	)
-	{return rewriteCross(inputs);}
+}		
+
+LogicalOperator CrossClause(LogicalPlan lp) : 
+{
+	LogicalOperator op; 
+	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
+	log.trace("Entering CrossClause");
+}
+{
+	(
+	op = NestedExpr(lp) { inputs.add(op); }
+	("," op = NestedExpr(lp) { inputs.add(op); })*
+	)
+	{
+		LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()), inputs);
+		lp.add(cross);
+		log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
+		
+		for (LogicalOperator lop: inputs) {
+				lp.connect(lop, cross);	
+				log.debug("Connected operator " + lop.getClass().getName() + " " + lop + " to " + cross + " logical plan " + lp);
+		}
+		log.debug("Connected cross inputs to the cross operator");
+		
+		log.trace("Exiting CrossClause");
+		return cross;
+	}
 }
 
-LogicalOperator JoinClause() : {CogroupInput gi; ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();}
+LogicalOperator JoinClause(LogicalPlan lp) : 
 {
-	(gi = GroupItem() { gis.add(gi); }
-	("," gi = GroupItem() { gis.add(gi); })*)
-	{return rewriteJoin(gis);}
+	CogroupInput gi; 
+	ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
+	log.trace("Entering JoinClause");
+	log.debug("LogicalPlan: " + lp);
+}
+{
+	(gi = GroupItem(lp) { gis.add(gi); }
+	("," gi = GroupItem(lp) { gis.add(gi); })*)
+	{log.trace("Exiting JoinClause"); return rewriteJoin(gis, lp);}
+	
 }
 
-LogicalOperator UnionClause() : {LogicalOperator op; ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();}
+LogicalOperator UnionClause(LogicalPlan lp) : 
 {
-	(op = NestedExpr() { inputs.add(op.getOperatorKey()); }
-	("," op = NestedExpr() { inputs.add(op.getOperatorKey()); })*)
-	{return new LOUnion(opTable, scope, getNextId(), inputs);}
+	LogicalOperator op;
+	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
+	log.trace("Entering UnionClause");
+}
+{
+	(op = NestedExpr(lp){inputs.add(op);} 
+	("," op = NestedExpr(lp) {inputs.add(op);})*)
+	{
+		LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, getNextId()), inputs);
+		lp.add(union);
+		log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
+		
+		for (LogicalOperator lop: inputs) {
+			lp.connect(lop, union);
+			log.debug("Connected union input operator " + lop.getClass().getName() + " to operator " + lop.getClass().getName() + " in the logical plan");
+		}		
+		
+		log.trace("Exiting UnionClause");
+		return union;
+	}
 }
 
 
-LogicalOperator ForEachClause() : {EvalSpec spec = null; LogicalOperator input, op; }
+LogicalOperator ForEachClause(LogicalPlan lp) : 
+{
+	ArrayList<LogicalOperator> specList = new ArrayList<LogicalOperator>(); 
+	LogicalOperator input, foreach; 
+	LogicalPlan foreachPlan = new LogicalPlan();
+	log.trace("Entering ForEachClause");
+}
 {
 	(
-	input = NestedExpr()
-	spec = NestedBlock(input.outputSchema())
+	input = NestedExpr(lp)
+	specList = NestedBlock(input.getSchema(), specList, foreachPlan, input)
 	)
 	{
-		op = new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), spec);
-		return op;
+		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+		try {
+			lp.add(foreach);
+			log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
+		
+			lp.connect(input, foreach);
+			log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " object " + input + " to operator " + foreach.getClass().getName() + " in the logical plan");
+		} catch (PlanException planException) {
+			ParseException parseException = new ParseException(planException.getMessage());
+			throw parseException;
+		}
+		
+		log.trace("Exiting ForEachClause");
+		return foreach;
 	}
 }
 
-EvalSpec NestedBlock(Schema over):
-{EvalSpec spec; Map<String, EvalSpec> specs = new HashMap<String, EvalSpec>();}
+ArrayList<LogicalOperator> NestedBlock(Schema over, ArrayList<LogicalOperator> specList, LogicalPlan lp, LogicalOperator input):
+{
+	LogicalOperator spec; 
+	Map<String, LogicalOperator> specs = new HashMap<String, LogicalOperator>(); 
+	log.trace("Entering NestedBlock");
+}
 {
 	(
-	spec = GenerateStatement(over,specs)
-|	("{" (NestedCommand(over,specs) ";")* spec = GenerateStatement(over,specs)	 ";" "}")
+	spec = GenerateStatement(over,specs, lp, input) {specList.add(spec);}
+|	("{" (NestedCommand(over,specs,specList, lp, input) ";")* spec = GenerateStatement(over,specs,lp,input)	 ";" "}")
+	{specList.add(spec);}
 	)
-	{return spec;}
+	{log.trace("Exiting NestedBlock"); return specList;}
 }
 
-void NestedCommand(Schema over, Map<String, EvalSpec> specs):
-{Token t; EvalSpec item;}
+void NestedCommand(Schema over, Map<String, LogicalOperator> specs, List<LogicalOperator> specList, LogicalPlan lp, LogicalOperator input):
+{
+	Token t; 
+	LogicalOperator item; 
+	ExpressionOperator eOp = null; 
+	log.trace("Entering NestedCommand");
+}
 {
 	(
 	t = <IDENTIFIER> "="
 	(
-	item = InfixExpr(over,specs)
-|	item = NestedFilter(over,specs)	 
-| 	item = NestedSortOrArrange(over,specs)
-|	item = NestedDistinct(over,specs)	
+	eOp = InfixExpr(over,specs,lp,input) 
+	{
+		item = eOp;
+		lp.add(eOp);
+		log.debug("Added operator " + eOp.getClass().getName() + " " + eOp + " to the logical plan " + lp);
+	}
+|	item = NestedFilter(over,specs,lp, input)	 
+| 	item = NestedSortOrArrange(over,specs,lp, input)
+|	item = NestedDistinct(over,specs,lp, input)	
 	)
 	)	
-	{specs.put(t.image,item);}
+	{
+		String alias = t.image;
+		item.setAlias(alias);
+		specs.put(alias,item);
+		log.debug("Added " + alias + " to the specs map");
+		specList.add(item);
+		log.debug("Added " + alias + " to the specList");
+		
+		log.trace("Exiting NestedCommand");
+	}
 }		
 
-EvalSpec NestedFilter(Schema over, Map<String, EvalSpec> specs):
-{Cond cond; EvalSpec item; Schema subSchema = null;}
+LogicalOperator NestedFilter(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
 {
-	<FILTER> item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
-	<BY> cond = PCond(subSchema,null)
-	{ return copyItemAndAddSpec(item,new FilterSpec(cond)); }
+	ExpressionOperator cond; 
+	Schema subSchema = null; 
+	ExpressionOperator eOp; 
+	LogicalPlan conditionPlan = new LogicalPlan(); 
+	log.trace("Entering NestedFilter");
+}
+{
+	(
+	<FILTER>  eOp = BaseEvalSpec(over, specs, lp, input)
+	{subSchema = eOp.getSchema();}
+	<BY> cond = PCond(subSchema,null,conditionPlan,input)
+	)
+	{ 
+		lp.add(eOp);
+		log.debug("Added " + eOp.getAlias() + " to the logical plan");
+		LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, eOp);
+		lp.add(filter);
+		log.debug("Added nested filter operator " + filter.getClass().getName() + " to the logical plan");
+		
+		lp.connect(eOp, filter);
+		log.debug("Connected the filter input to the filter");
+		
+		log.trace("Exiting NestedFilter");
+		return filter;  
+	}
+
 }
 
-EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;}
+LogicalOperator NestedSortOrArrange(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	ExpressionOperator col;	
+	boolean star = false; 
+	ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>(); 
+	ArrayList<Boolean> ascOrder = new ArrayList<Boolean>(); 
+	String funcName = null; 
+	ExpressionOperator eOp;
+	Token t; 
+	boolean asc = true; 
+	log.trace("Entering NestedSortOrArrange");}
 {
 	(
 	( t = <ORDER> | t = <ARRANGE> )
-	item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
-	<BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
-		| sortSpec = Star() )     
+	(
+		eOp = BaseEvalSpec(over, specs, lp, input)
+		{
+			try{
+				eOp.getSchema();
+			}catch(FrontendException fe) {
+				ParseException pe = new ParseException(fe.getMessage());
+				throw pe;
+			}
+			log.debug("Before BY");
+		}		
+	)
+	<BY> 
+	(
+		(
+			col = SortCol(eOp.getSchema(), lp, eOp, ascOrder, sortColPlans) 
+			("," col = SortCol(eOp.getSchema(), lp, eOp, ascOrder, sortColPlans) )*		
+			
+		)
+		| <STAR> {star = true;} [<ASC> | <DESC> {asc = false;}] 
+			{
+				if(asc) {
+					ascOrder.add(true);
+				} else {	
+					ascOrder.add(false);
+				}
+			}		
+	)     
     (
         <USING>  funcName = QualifiedFunction()
-        {
-            try {
-                sortSpec.setComparatorName(funcName);
-            } catch (Exception e){
-                throw new ParseException(e.getMessage());
-            }
-        }
     )?
 	)
-	{ return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); }
+	{	
+		log.debug("Before creating LOSort");
+		LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), eOp, sortColPlans, ascOrder, funcName);
+		sort.setStar(star);
+		log.debug("After creating LOSort");
+		try {
+			lp.add(eOp);
+			log.debug("Added " + eOp +  " " + eOp.getClass().getName() + " to the logical plan");
+			lp.add(sort);
+			log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
+		
+			lp.connect(eOp, sort);
+			log.debug("Connected alias " + eOp.getAlias() + " operator " + eOp.getClass().getName() + " to operator " + sort.getClass().getName() + " the logical plan");
+		} catch (PlanException planException) {
+			ParseException parseException = new ParseException(planException.getMessage());
+			throw parseException;
+		}
+		
+		log.trace("Exiting NestedSortOrArrange");
+		return sort;		
+	}
 }
 	
-EvalSpec NestedDistinct(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec item; LogicalOperator subOp = null; Token t;}
+LogicalOperator NestedDistinct(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	Token t; 
+	ExpressionOperator eOp; 
+	log.trace("Entering NestedDistinct");
+}
 {
 	(
-	<DISTINCT>
-	item = BaseEvalSpec(over,specs)
+	<DISTINCT> eOp = BaseEvalSpec(over, specs, lp, input)
 	)
 	{ 
-		return copyItemAndAddSpec(item,new SortDistinctSpec(true, null)); 
+		lp.add(eOp);
+		log.debug("Added " + eOp.getAlias() + " to the logical plan");
+		LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()), eOp);
+		lp.add(distinct);
+		log.debug("Added operator " + distinct.getClass().getName() + " to the logical plan");
+		
+		lp.connect(eOp, distinct);
+		log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + distinct.getClass().getName() + " in the logical plan");
+		
+		log.trace("Exiting NestedDistinct");
+		return distinct; 
 	}
 }
 	
 	
-GenerateSpec GenerateStatement(Schema over, Map<String, EvalSpec> specs):
-{GenerateSpec spec = null; TupleSchema schema;}
+LogicalOperator GenerateStatement(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	LogicalOperator spec = null; 
+	Schema schema;
+	setInsideGenerate(true);
+	log.trace("Entering GenerateStatement");
+}
 {
 	(
 	<GENERATE>
-	spec = FlattenedGenerateItemList(over,specs)
+	spec = FlattenedGenerateItemList(over,specs,lp,input)
 	)
 	{
+		log.debug("Connecting generate inputs");
+		for(LogicalOperator op: getGenerateInputs()) {
+			lp.connect(op, spec);
+			log.debug("Connected operator: " + op.getClass().getName() + " to " + op + " " + spec + " in logical plan " + lp);
+		}
+		resetGenerateState();
+		log.trace("Exiting GenerateStatement");
 		return spec;
 	}
 }
 
-GenerateSpec FlattenedGenerateItemList(Schema over, Map<String, EvalSpec> specs):
-{ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>(); EvalSpec item;}
+LogicalOperator FlattenedGenerateItemList(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>(); 
+	ArrayList<Boolean> flattenList = new ArrayList<Boolean>(); 
+	ExpressionOperator item;
+	LogicalPlan generatePlan;
+	log.trace("Entering FlattenedGenerateItemList");
+}
 {
 	(
-	item = FlattenedGenerateItem(over,specs) {specList.add(item);}
-	("," item = FlattenedGenerateItem(over,specs) {specList.add(item);})*
+	item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) {generatePlans.add(generatePlan);}
+	("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) {generatePlans.add(generatePlan);})*
+	
 	)
-	{return new GenerateSpec(specList);}
+	{
+		LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+		lp.add(generate);
+		log.debug("Added operator " + generate.getClass().getName() + " to the logical plan");
+		log.trace("Exiting FlattenedGenerateItemList");
+		return generate;
+	}
 }
 	
 
-EvalSpec FlattenedGenerateItem(Schema over, Map<String, EvalSpec> specs): 
-{EvalSpec item; Schema schema = null;}
+ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList): 
+{
+	ExpressionOperator item; 
+	Schema schema = null; 
+	Token t; 
+	Schema.FieldSchema fs; 
+	boolean flatten = false;
+	log.trace("Entering FlattenedGenerateItem");
+}
 {
 	(
 	(
-	(	<FLATTEN> "(" item = InfixExpr(over,specs) ")" 
+	(
+		<FLATTEN> "(" item = InfixExpr(over,specs,lp,input) ")" 
 		{
-			item.setFlatten(true);
+			flatten = true;
+		}
+	)
+|	(item = InfixExpr(over,specs,lp,input))
+|	( <STAR> 
+		{
+			LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1); 
+			project.setStar(true); 
+			item = project;
+			lp.add(project);
+			log.debug("FGItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
 		}
 	)
-|	(item = InfixExpr(over,specs))
-|	(item = Star())
 	)
-	[ <AS> schema = Schema() ]
+	[ <AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" | fs = AtomSchema() {schema = new Schema(fs);})]
 	)
 	{
-		item.setSchema(schema);
+		log.debug("item: " + item.getClass().getName());
+		if(null != schema) {
+			item.setSchema(schema); 
+			log.debug("Printing Schema Aliases from parser"); 
+			schema.printAliases();
+			log.debug("Printing Schema Aliases from item"); 
+			item.getSchema().printAliases();			
+		}
+		flattenList.add(flatten);
+		log.trace("Exiting FlattenedGenerateItem");
 		return item;
 	}
 }
 	
-EvalSpec InfixExpr(Schema over, Map<String, EvalSpec> specs) : { EvalSpec expr; }
+ExpressionOperator InfixExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
 {
-	expr = AdditiveExpr(over,specs) 
-	{return expr;}
+	ExpressionOperator expr; 
+	log.trace("Entering InFixExpr");
+}
+{
+	expr = AdditiveExpr(over,specs,lp,input) 
+	{log.trace("Exiting InFixExpr");return expr;}
 }
 
-EvalSpec AdditiveExpr(Schema over,  Map<String, EvalSpec> specs) : { Token t; EvalSpec lhs, rhs; GenerateSpec args;  }
+ExpressionOperator AdditiveExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{ 
+	Token t; 
+	ExpressionOperator lhs, rhs, exprOp; 
+	log.trace("Entering AdditiveExpr");
+}
 {
 	(
-	lhs = MultiplicativeExpr(over,specs) 	
+	lhs = MultiplicativeExpr(over,specs,lp,input) 	
 		(
-		( t = "+" | t = "-" ) rhs = MultiplicativeExpr(over,specs)
+		( t = "+" | t = "-" ) rhs = MultiplicativeExpr(over,specs,lp,input)
 		 	
 		{
 			assertAtomic(lhs,true);
 			assertAtomic(rhs,true);
-			ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
-			argsList.add(lhs);
-			argsList.add(rhs);
-			args = new GenerateSpec(argsList);
 			if (t.image.equals("+")){
-				lhs = new FuncEvalSpec(pigContext, ADD.class.getName(), args);
+				exprOp = new LOAdd(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
 			}else{
-				lhs = new FuncEvalSpec(pigContext, SUBTRACT.class.getName(), args);
+				exprOp = new LOSubtract(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
 			}
+			lp.add(exprOp);
+			log.debug("AdditiveExpr: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+			lp.connect(lhs, exprOp);
+			log.debug("AdditiveExpr: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + exprOp + " logical plan " + lp);
+			lp.connect(rhs, exprOp);
+			log.debug("AdditiveExpr: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + exprOp + " logical plan " + lp);
+			lhs = exprOp;
 		}
 		)*
 	)
-	{return lhs;}		
+	{
+		log.trace("Exiting AdditiveExpr");
+		return lhs;
+	}		
 }
 
-EvalSpec MultiplicativeExpr(Schema over, Map<String, EvalSpec> specs) : { Token t; EvalSpec lhs, rhs; GenerateSpec args; }
+ExpressionOperator MultiplicativeExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{ 
+	Token t; 
+	ExpressionOperator lhs, rhs, exprOp; 
+	log.trace("Entering MultiplicativeExpr");
+}
 {
 		(
-		lhs = UnaryExpr(over,specs)
+		lhs = UnaryExpr(over,specs,lp,input)
 		(
-		( t = <STAR> | t = "/" ) rhs = UnaryExpr(over,specs) 			
+		( t = <STAR> | t = "/" | t = "%") rhs = UnaryExpr(over,specs,lp,input) 			
 		{
 			assertAtomic(lhs,true);
 			assertAtomic(rhs,true);
-			ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
-			argsList.add(lhs);
-			argsList.add(rhs);
-			args = new GenerateSpec(argsList);
 			if (t.image.equals("*")){
-				lhs = new FuncEvalSpec(pigContext, MULTIPLY.class.getName(), args);
-			}else{
-				lhs = new FuncEvalSpec(pigContext, DIVIDE.class.getName(), args);
+				exprOp = new LOMultiply(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+			}else if (t.image.equals("/")){
+				exprOp = new LODivide(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+			}else {
+				exprOp = new LOMod(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
 			}
+			lp.add(exprOp);
+			log.debug("MultiplicativeExpr: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+			lp.connect(lhs, exprOp);
+			log.debug("MultiplicativeExpr: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + exprOp + " logical plan " + lp);
+			lp.connect(rhs, exprOp);
+			log.debug("MultiplicativeExpr: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + exprOp + " logical plan " + lp);
+			lhs = exprOp;
 		}
 		)*
 		)
-		{return lhs;}
+		{
+			log.trace("Exiting MultiplicativeExpr");
+			return lhs;
+		}
 }
 
-EvalSpec UnaryExpr(Schema over,  Map<String, EvalSpec> specs) : { EvalSpec expr; }
+ExpressionOperator UnaryExpr(Schema over,  Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator expr; 
+	log.trace("Entering UnaryExpr");
+}
 {
 	(
-	LOOKAHEAD(BaseEvalSpec(over,specs)) expr = BaseEvalSpec(over,specs)
-|	( "(" expr = InfixExpr(over,specs) ")" )
+	LOOKAHEAD(BaseEvalSpec(over,specs,lp,input)) expr = BaseEvalSpec(over,specs,lp,input)
+|	( "(" expr = InfixExpr(over,specs,lp,input) ")" )
+|	expr = NegativeExpr(over,specs,lp,input)
+
 	)
-	{return expr;}
+	{log.trace("Exiting UnaryExpr");return expr;}
 }
 
+ExpressionOperator NegativeExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ExpressionOperator c1;
+	LogicalPlan exprPlan = new LogicalPlan();
+	log.trace("Entering NegativeExpr");
+}
+{
+	"-" c1=UnaryExpr(over,specs,lp,input)
+	{
+		ExpressionOperator eOp = new LONegative(lp, new OperatorKey(scope, getNextId()), c1);
+		lp.add(eOp);
+		log.debug("NegativeExpr: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
+		lp.connect(c1, eOp);
+		log.trace("Exiting NegativeExpr");
+		return eOp;
+	}
+}
+
+
 	
-EvalSpec BaseEvalSpec(Schema over, Map<String, EvalSpec> specs) :
-{EvalSpec item;EvalSpec projection; Schema subSchema = null; Token t;}
+ExpressionOperator BaseEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+	ExpressionOperator item;
+	ExpressionOperator projection; 
+	Schema subSchema = null; 
+	Token t; 
+	log.trace("Entering BaseEvalSpec");
+}
 {
 	(
-	item = Const()
+	item = Const(lp)
 |	(
 	(
-		LOOKAHEAD(FuncEvalSpec(over,specs))
-		item = FuncEvalSpec(over,specs)
-	|	item = ColOrSpec(over,specs) 
-	| 	item = BinCond(over,specs)
+		LOOKAHEAD(FuncEvalSpec(over,specs,lp,input))
+		item = FuncEvalSpec(over,specs,lp,input)
+	|	item = ColOrSpec(over,specs,lp,input) 
+	| 	item = BinCond(over,specs,lp,input)
+	
 	)
-	{item = item.copy(pigContext);}
 	(
-		{ subSchema = item.getOutputSchemaForPipe(over); }	
+		{ 
+			subSchema = item.getSchema(); 
+			//TODO
+			//HACK for the schema problems with LOProject
+			//Check the schema to see if the constituent
+			//field is a bag or a tuple/ If so, then get
+			//that schema and send it out instead of the
+			//actual schema
+			log.debug("Printing subSchema Aliases");
+			subSchema.printAliases();
+			
+			log.debug("Printing the field schemas of subSchema");
+			for(Schema.FieldSchema fs: subSchema.getFields()) {
+				log.debug("fs: " + fs);
+				subSchema = fs.schema;
+			}
+			
+		}
 		( 
-			"." projection = BracketedSimpleProj(subSchema) 
+			"." projection = BracketedSimpleProj(subSchema,lp,item) 
 			{
 				assertAtomic(item,false); 
-				item = item.addSpec(projection);
+				lp.remove(item);
+				item = projection;
 			}
 		)
 |		( "#" t = <QUOTEDSTRING> { 
 			assertAtomic(item, false);
-			item = item.addSpec(new MapLookupSpec(unquote(t.image)));
+			ExpressionOperator mapLookup = new LOMapLookup(lp, new OperatorKey(scope, getNextId()), item, unquote(t.image), DataType.BYTEARRAY, null);
+			item = mapLookup;
+			lp.add(mapLookup);
+			log.debug("BaseEvalSpec: Added operator " + mapLookup.getClass().getName() + " " + mapLookup + " to logical plan " + lp);
+			lp.connect(item, mapLookup);
+			log.debug("BaseEvalSpec: Connected operator " + item.getClass().getName() + " " + item+ " to " + mapLookup + " logical plan " + lp);
 		}
 		)
-	)*
+	)*	
 	)
 	)
-	{return item;}
+	{log.trace("Exiting BaseEvalSpec"); return item;}
 }
 
 
 
-EvalSpec BinCond(Schema over, Map<String, EvalSpec> specs):
-{Cond cond; EvalSpec ifTrue, ifFalse; EvalSpec ret = null;}
+ExpressionOperator BinCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	ExpressionOperator cond; 
+	ExpressionOperator ifTrue, ifFalse; 
+	LogicalPlan conditionPlan = new LogicalPlan();
+	LogicalPlan truePlan = new LogicalPlan();
+	LogicalPlan falsePlan = new LogicalPlan();
+	log.trace("Entering BinCond");
+}
 {	
 	(
-	"(" cond = PCond(over,specs) "?" ifTrue = InfixExpr(over,specs) 
-	":" ifFalse = InfixExpr(over,specs) ")"
+	"(" cond = PCond(over,specs,lp,input) "?" ifTrue = InfixExpr(over,specs,lp,input) 
+	":" ifFalse = InfixExpr(over,specs,lp,input) ")"
+	
 	)
-	{ return new BinCondSpec(cond,ifTrue,ifFalse);}
+	{ 
+		//ExpressionOperator bincond = new LOBinCond(lp, new OperatorKey(scope, getNextId()), conditionPlan, truePlan, falsePlan);
+		ExpressionOperator bincond = new LOBinCond(lp, new OperatorKey(scope, getNextId()), cond, ifTrue, ifFalse);
+		//TODO - Need to connect expression operators with the new plan
+		lp.add(bincond);
+		log.debug("BinCond: Added operator " + bincond.getClass().getName() + " " + bincond + " to logical plan " + lp);
+		lp.connect(cond, bincond);
+		lp.connect(ifTrue, bincond);
+		lp.connect(ifFalse, bincond);
+		log.trace("Exiting BinCond");
+		return bincond;
+	}
 }
 
 
-EvalSpec FuncEvalSpec(Schema over, Map<String, EvalSpec> specs) : {String funcName; GenerateSpec args;}
+ExpressionOperator FuncEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
 {
-	funcName=EvalFunction() "(" args=EvalArgs(over,specs) ")" 
-	{return new FuncEvalSpec(pigContext, funcName, args);}
+	String funcName; 
+	List<ExpressionOperator> args;
+	log.trace("Entering FuncEvalSpec");
+}
+{
+	funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")" 
+	{
+		ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BYTEARRAY);
+		lp.add(userFunc);
+		log.debug("FuncEvalSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
+		for(ExpressionOperator exprOp: args) {
+			lp.connect(exprOp, userFunc);
+			log.debug("FuncEvalSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
+		}
+		log.trace("Exiting BinCond");
+		return userFunc;
+	}
 }
 
-GenerateSpec EvalArgs(Schema over, Map<String, EvalSpec> specs) : {ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>(); EvalSpec item;}
+List<ExpressionOperator> EvalArgs(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
+{
+	ArrayList<ExpressionOperator> specList = new ArrayList<ExpressionOperator>(); 
+	ExpressionOperator item;
+	log.trace("Entering EvalArgs");
+}
 {
 	(
-	(item=EvalArgsItem(over,specs)	{specList.add(item);}
-	("," item=EvalArgsItem(over,specs) {specList.add(item);})*)
+	(item=EvalArgsItem(over,specs,lp,input)	{specList.add(item);}
+	("," item=EvalArgsItem(over,specs,lp,input) {specList.add(item);})*)
+	
 	| {}
 	)
 	{
-		return new GenerateSpec(specList);
+		log.trace("Exiting EvalArgs");
+		return specList;
 	}
 }
 
-EvalSpec EvalArgsItem(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec item;}
+ExpressionOperator EvalArgsItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+	ExpressionOperator item;
+	log.trace("Entering EvalArgsItem");
+}
 {
 	(
-	item = InfixExpr(over,specs)
-|	item = Star()
+	item = InfixExpr(over,specs,lp,input)
+|	<STAR> 
+	{
+		LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1); 
+		project.setStar(true); 
+		item = project;
+		lp.add(project);
+		log.debug("EvalArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+	}
 	)
-	{return item;}
+	{log.trace("Entering EvalArgsItem");return item;}
 }
 
 
-Schema Schema() : { Token t1; Schema item = null;}
+Schema.FieldSchema FieldSchema() : 
+{
+	Token t1; 
+	Schema item = null; 
+	Schema.FieldSchema fs = null; 
+	log.trace("Entering Schema");
+}
 {
 	(
-	LOOKAHEAD(SchemaTuple()) item = SchemaTuple()
-|	LOOKAHEAD(SchemaBag()) item = SchemaBag()
-|   LOOKAHEAD(AtomSchema()) item = AtomSchema() 
+	LOOKAHEAD(SchemaTuple()) fs = SchemaTuple()
+|	LOOKAHEAD(SchemaBag()) fs = SchemaBag()
+|	LOOKAHEAD(AtomSchema()) fs = AtomSchema()
 	)
-	{return item;}
+	//{log.debug("Printing Aliases"); item.printAliases();log.trace("Exiting Schema");return item;}
+	{log.trace("Exiting Schema");return fs;}
 }
 
-Schema AtomSchema() : {Token t1;}
+Schema.FieldSchema AtomSchema() : 
+{
+	Token t1;
+	log.trace("Entering AtomSchema");
+}
 {
-	(  ( t1 = <IDENTIFIER> ) { return new AtomSchema(t1.image); } )
+	(  ( t1 = <IDENTIFIER> ) 
+		{ 
+			log.debug("AtomSchema: " + t1.image);
+			
+			log.trace("Exiting AtomSchema");
+			return new Schema.FieldSchema(t1.image, DataType.BYTEARRAY); 
+		} 
+	)
 }
 
-TupleSchema SchemaTuple() : {Token t1 = null; TupleSchema list;}
+Schema.FieldSchema SchemaTuple() : 
+{
+	Token t1 = null; 
+	Schema list;
+	Schema.FieldSchema fs;
+	log.trace("Entering SchemaTuple");
+}
 { 
 	[( t1 = <IDENTIFIER> ) ":"] "(" list = TupleSchema() ")"	 
 	{
-		if (t1!=null)
-			list.setAlias(t1.image);
-		return list;
+		if (null != t1) {
+			log.debug("TUPLE alias " + t1.image);
+			fs = new Schema.FieldSchema(t1.image, list, DataType.TUPLE);
+		} else {
+			fs = new Schema.FieldSchema(null, list, DataType.TUPLE);
+		}
+		log.trace("Exiting SchemaTuple");
+		return fs;
 	} 
 }
 
-TupleSchema SchemaBag() : {Token t1 = null; TupleSchema list;}
+Schema.FieldSchema SchemaBag() : 
+{
+	Token t1 = null; 
+	Schema list;
+	Schema.FieldSchema fs;
+	log.trace("Entering SchemaBag");
+}
 { 
 	[( t1 = <IDENTIFIER> ) ":"] "[" list = TupleSchema() "]"	 
 	{
-		if (t1!=null)
-			list.setAlias(t1.image); 
-		return list;
+		if (null != t1) {
+			log.debug("BAG alias " + t1.image);
+			fs = new Schema.FieldSchema(t1.image, list, DataType.BAG);
+		} else {
+			fs = new Schema.FieldSchema(null, list, DataType.BAG);
+		}
+		log.trace("Exiting SchemaBag");
+		return fs;
 	} 
 }
 
 
-TupleSchema TupleSchema() : { Schema item = null; TupleSchema list = new TupleSchema(); }
+Schema TupleSchema() : 
+{
+	Schema item = null; 
+	Schema list = new Schema(); 
+	Schema.FieldSchema fs = null;
+	log.trace("Entering TupleSchema");
+}
 {
 	(	
-	(	item=Schema() { list.add(item); } 
-		( "," item=Schema() {list.add(item);} )* 
+	(	
+		fs = FieldSchema() {log.debug("Adding " + fs.alias + " to the list: " + list);list.add(fs);} 
+		( "," fs = FieldSchema() {log.debug("Adding " + fs.alias + " to the list: " + list);list.add(fs);})* 
 	)
 |		{}
 	)
-	{return list;}
+	{log.debug("Printing Aliases in TupleSchema"); list.printAliases();log.trace("Exiting TupleSchema");return list;}
 }
-	
-//CQ stuff
-
-EvalSpec PWindow() : {EvalSpec spec; int numTuples; double time;}
-{
-	( <WINDOW> 
-		( LOOKAHEAD(2)
-		  time = PTimeWindow() { spec = new TimeWindowSpec(WindowSpec.windowType.SLIDING, time); } |
-		  numTuples = PTupleWindow() { spec = new TupleWindowSpec(WindowSpec.windowType.SLIDING, numTuples);}
-		)
-	)	  
-	{return spec;}
-}	
-	
-double PTimeWindow() : {double n; Token t;}
-{
-	( t = <NUMBER> { n = Double.parseDouble(t.image); }
-		( <SECONDS> |
-		  <MINUTES> { n = n*60; } |
-		  <HOURS> { n = n * 3600; }
-		)
-	)
-	{return n;}
-}   
-
-int PTupleWindow() : {int n; Token t;}
-{
-	( t = <NUMBER> { try{ 
-						n = Integer.parseInt(t.image); 
-					 }catch(NumberFormatException e){
-					 	throw new ParseException("Only whole number tuple windows allowed.");
-					 }
-				   } 
-		 <TUPLES> 
-	)
-	{return n;}
-}   
-	
-
-
-
-
 
 
 // These the simple non-terminals that are shared across many
 
-String EvalFunction() : {String funcName;}
+String EvalFunction() : 
 {
-	funcName = QualifiedFunction()
-	{
-		try{
-			EvalFunc ef = (EvalFunc) pigContext.instantiateFuncFromAlias(funcName);
-		}catch (Exception e){
-			throw new ParseException(e.getMessage());
-		}
-		return funcName;
-	}
+	String funcName;
+	log.trace("Entering EvalFunction");
 }
-
-String FilterFunction() : {String funcName;}
 {
 	funcName = QualifiedFunction()
 	{
 		try{
-			FilterFunc ff = (FilterFunc) pigContext.instantiateFuncFromAlias(funcName);
+			LOUserFunc ef = (LOUserFunc) pigContext.instantiateFuncFromAlias(funcName);
 		}catch (Exception e){
-			throw new ParseException(funcName + " is not a valid filter function");
+			throw new ParseException(e.getMessage());
 		}
+		
+		log.trace("Exiting EvalFunc");
 		return funcName;
-	}	
+	}
 }
 
-
 /**
  * Bug 831620 - '$' support
  */
@@ -1017,113 +1793,180 @@
 /**
  * Bug 831620 - '$' support
  */
-String QualifiedFunction() #void : {Token t1;StringBuffer s=new StringBuffer();}
+String QualifiedFunction() #void : {Token t1;StringBuffer s=new StringBuffer(); log.trace("Entering QualifiedFunction");}
 {
 	((t1=<IDENTIFIER> { s.append(t1.image);}
 	 (("." t1=<IDENTIFIER> {s.append("." + t1.image);})| 
 	  ("$" t1=<IDENTIFIER> {s.append("$" + t1.image);}))*)) 
-	 {return s.toString();}
+	 {
+	 	log.debug("QualifiedFunction: " + s.toString());
+	 	log.trace("Exiting QualifiedFunction"); 
+	 	return s.toString();
+	 }
 }
 
 
 // If there is one time it may not be bracketed, but if multiple, they must be bracketed
-ProjectSpec BracketedSimpleProj(Schema over) : {EvalSpec es; int i; ProjectSpec spec = null;}
+ExpressionOperator BracketedSimpleProj(Schema over, LogicalPlan lp, LogicalOperator eOp) : 
+{
+	ExpressionOperator es; 
+	int i; 
+	ExpressionOperator spec = null;
+	log.trace("Entering BracketedSimpleProj");
+	log.debug("eOp: " + eOp.getClass().getName());
+}
 {
 	(
-	es = ColOrSpec(over,null) {spec = (ProjectSpec) es;} 
-|	("(" spec = SimpleProj(over) ")")	
+	spec = ColOrSpec(over,null,lp,eOp) 
+|	("(" spec = SimpleProj(over,lp,eOp) ")")	
+	
 	)
-	{return spec;}	
+	{log.trace("Exiting BracketedSimpleProj");return spec;}	
 }
 
-ProjectSpec SimpleProj(Schema over): 
-{EvalSpec i = null; ArrayList<Integer> colList = new ArrayList<Integer>();}
+ExpressionOperator SimpleProj(Schema over, LogicalPlan lp, LogicalOperator eOp): 
 {
-	i = ColOrSpec(over,null) {colList.add(((ProjectSpec)i).getCol());}	
-		("," i = ColOrSpec(over, null) {colList.add(((ProjectSpec)i).getCol());})*
-	{return new ProjectSpec(colList);}
+	int i; 
+	ArrayList<Integer> colList = new ArrayList<Integer>();
+	log.trace("Entering SimpleProj");
 }
-
-
-//Just a simple list of projection items
-GenerateSpec SimpleArgs(Schema over) : {EvalSpec i = null; ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>();}
 {
-	(
-	(
-	i = SimpleArgsItem(over) {specList.add(i);}	
-		("," i = SimpleArgsItem(over) {specList.add(i);})*
-	)
-	| {}
-	)
+	i = ColName(over) {colList.add(i);}	
+		("," i = ColName(over) {colList.add(i);})*
 	{
-		if (specList.isEmpty())
-			return null;
-		else	
-			return new GenerateSpec(specList);
-	}	
+		ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, colList);
+		lp.add(project);
+		log.debug("SimpleProj: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+		log.trace("Exiting SimpleProj");
+		return project;
+	}
 }
 
-EvalSpec SimpleArgsItem(Schema over):
-{EvalSpec item;}
-{
-	(
-	item = Const()
-|	item = ColOrSpec(over,null)
-|	item = Star()
-	)
-	{return item;}
-}		
-
 
-StarSpec Star() : {Token t1; StarSpec spec;}
+ExpressionOperator Const(LogicalPlan lp) : 
 {
-	t1=<STAR>
-	{
-		spec = new StarSpec();
-		spec.setFlatten(true);
-		return spec;
-	}	
+	Token t1; 
+	String s;
+	log.trace("Entering Const");
 }
-
-EvalSpec Const() : {Token t1; String s;}
 {
 	(
 	t1=<QUOTEDSTRING> {s = unquote(t1.image);}
 |	t1 = <NUMBER> {s = t1.image;}
 	)
-	{return new ConstSpec(s);}
+	{
+		ExpressionOperator lConst = new LOConst(lp, new OperatorKey(scope, getNextId()), s);
+		lp.add(lConst);
+		log.debug("Const: Added operator " + lConst.getClass().getName() + " " + lConst + " to logical plan " + lp);
+		log.trace("Exiting Const");
+		return lConst;
+	}
 }
 
-EvalSpec ColOrSpec(Schema over, Map<String, EvalSpec> specs) : 
-{EvalSpec spec;}
+ExpressionOperator ColOrSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator eOp) : 
+{
+	ExpressionOperator spec;
+	log.trace("Entering ColOrSpec");
+}
 {
 	(
-	spec = DollarVar()
-|	spec = AliasFieldOrSpec(over,specs)
+	spec = DollarVar(lp, eOp)
+|	spec = AliasFieldOrSpec(over,specs,lp,eOp)
+
 	)
 	{
+		log.trace("Exiting ColOrSpec");
 		return spec;
 	}
 }
 
-ProjectSpec DollarVar() : {Token t1;}
+ExpressionOperator DollarVar(LogicalPlan lp, LogicalOperator eOp) : 
+{
+	Token t1;
+	log.trace("Entering DollarVar");
+}
 {
 	t1=<DOLLARVAR>	
-	{return new ProjectSpec(undollar(t1.image));}
+	{
+		log.debug("Token: " + t1.image);
+		ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image));
+		try {
+			log.debug("eOp: " + eOp.getClass().getName() + " " + eOp);
+			lp.add(project);
+			log.debug("DollarVar: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+		} catch (Exception planException) {
+			ParseException parseException = new ParseException(planException.getMessage());
+			throw parseException;
+		}
+		log.trace("Exiting DollarVar");
+		return project;
+	}
 }
 
-EvalSpec AliasFieldOrSpec(Schema over, Map<String, EvalSpec> specs) : {Token t1;}
+ExpressionOperator AliasFieldOrSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator eOp) : 
+{
+	Token t1;
+	LogicalPlan projectInputPlan = new LogicalPlan();
+	log.trace("Entering AliasFieldOrSpec");
+}
 {
 	(t1=<GROUP> | t1=<IDENTIFIER>) 
-	{	int i; EvalSpec item = null;
-		if (specs!=null)
-			item = specs.get(t1.image);
+	{	
+		log.debug("Token: " + t1.image);
+		if(null != eOp) log.debug("eOp: " + eOp.getClass().getName());
+		int i; 
+		ExpressionOperator item = null;
+		if (specs!=null) {
+			log.debug("specs != null");
+			LogicalOperator op = specs.get(t1.image);
+			if(null != op) {
+				log.debug("Alias: " + op.getAlias());
+				
+				item = new LOProject(lp, new OperatorKey(scope, getNextId()), op, -1);
+				((LOProject)item).setStar(true);
+				log.debug("Set star to true");
+				
+				if(insideGenerate()) {
+					log.debug("AliasFieldOrSpec: Inside generate");
+					addGenerateInput(op);
+				}
+				try {
+					lp.add(item);
+					log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp);
+				} catch (Exception planException) {
+					ParseException parseException = new ParseException(planException.getMessage());
+					throw parseException;
+				}
+			}
+		}
 		
 		if (item == null){
-			if ( over == null ||  (i = over.colFor(t1.image)) == -1)
-				throw new ParseException("Invalid alias: " + t1.image + " in " + over); 
-			item = new ProjectSpec(i);		
+			log.debug("item == null");
+			if (null == over) log.debug("over is null");
+			if ( over == null ||  (i = over.getPosition(t1.image)) == -1) {
+				log.debug("Invalid alias: " + t1.image + " in " + over);
+				if(null != over) {
+					log.debug("Printing out the aliases in the schema");
+					over.printAliases();
+				}
+				throw new ParseException("Invalid alias: " + t1.image + " in " + over);
+			}
+			log.debug("Position of " + t1.image + " = " + i);
+			if(null != over) {
+				log.debug("Printing out the aliases in the schema");
+				over.printAliases();
+			}	
+			item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i);
+			try {
+				lp.add(item);
+				log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp);
+			} catch (Exception planException) {
+				ParseException parseException = new ParseException(planException.getMessage());
+				throw parseException;
+			}
 		}
+		
+		log.trace("Exiting AliasFieldOrSpec");
 		return item;
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=654629&r1=654628&r2=654629&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu May  8 14:25:22 2008
@@ -22,9 +22,17 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.io.IOException;
 
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.plan.MultiMap;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
 
 public class Schema {
 
@@ -40,26 +48,35 @@
         public byte type;
 
         /**
-         * If this is a tuple itself, it can have a schema.  Otherwise this
-         * field must be null.
+         * If this is a tuple itself, it can have a schema. Otherwise this field
+         * must be null.
          */
         public Schema schema;
+        
+        private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
 
         /**
          * Constructor for any type.
-         * @param a Alias, if known.  If unknown leave null.
-         * @param t Type, using codes from {@link org.apache.pig.data.DataType}.
+         * 
+         * @param a
+         *            Alias, if known. If unknown leave null.
+         * @param t
+         *            Type, using codes from
+         *            {@link org.apache.pig.data.DataType}.
          */
         public FieldSchema(String a, byte t) {
             alias = a;
-            type = t ;
-            schema = null;
+            type = t;
+            schema = null;            
         }
 
         /**
          * Constructor for tuple fields.
-         * @param a Alias, if known.  If unknown leave null.
-         * @param s Schema of this tuple.
+         * 
+         * @param a
+         *            Alias, if known. If unknown leave null.
+         * @param s
+         *            Schema of this tuple.
          */
         public FieldSchema(String a, Schema s) {
             alias = a;
@@ -67,6 +84,29 @@
             schema = s;
         }
 
+        /**
+         * Constructor for tuple fields.
+         * 
+         * @param a
+         *            Alias, if known. If unknown leave null.
+         * @param s
+         *            Schema of this tuple.
+         * @param t
+         *            Type, using codes from
+         *            {@link org.apache.pig.data.DataType}.
+         * 
+         */
+        public FieldSchema(String a, Schema s, byte t)  throws FrontendException {
+            alias = a;
+            schema = s;
+            log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
+            if ((null != s) && (t != DataType.BAG) && (t != DataType.TUPLE)) {
+                throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
+                        + DataType.findTypeName(t));
+            }
+            type = t;
+        }
+
         @Override
         public boolean equals(Object other) {
             if (!(other instanceof FieldSchema)) return false;
@@ -78,6 +118,8 @@
 
             return true;
         }
+
+        // TODO Need to add hashcode.
         
         /***
          * Compare two field schema for equality
@@ -120,6 +162,14 @@
 
     private List<FieldSchema> mFields;
     private Map<String, FieldSchema> mAliases;
+    private MultiMap<FieldSchema, String> mFieldSchemas;
+    private static Log log = LogFactory.getLog(Schema.class);
+ 
+    public Schema() {
+        mFields = new ArrayList<FieldSchema>();
+        mAliases = new HashMap<String, FieldSchema>();
+        mFieldSchemas = new MultiMap<FieldSchema, String>();
+    }
 
     /**
      * @param fields List of field schemas that describes the fields.
@@ -127,8 +177,14 @@
     public Schema(List<FieldSchema> fields) {
         mFields = fields;
         mAliases = new HashMap<String, FieldSchema>(fields.size());
-        for (FieldSchema fs : fields) {
-            if (fs.alias != null) mAliases.put(fs.alias, fs);
+        mFieldSchemas = new MultiMap<FieldSchema, String>();
+        for (FieldSchema fs : fields) {                    
+            if (fs.alias != null) {
+                mAliases.put(fs.alias, fs);
+                if(null != fs) {
+                    mFieldSchemas.put(fs, fs.alias);    
+                }
+            }
         }
     }
 
@@ -139,8 +195,13 @@
     public Schema(FieldSchema fieldSchema) {
         mFields = new ArrayList<FieldSchema>(1);
         mFields.add(fieldSchema);
+        mAliases = new HashMap<String, FieldSchema>(1);
+        mFieldSchemas = new MultiMap<FieldSchema, String>();
         if (fieldSchema.alias != null) {
             mAliases.put(fieldSchema.alias, fieldSchema);
+            if(null != fieldSchema) {
+                mFieldSchemas.put(fieldSchema, fieldSchema.alias);
+            }
         }
     }
 
@@ -155,15 +216,18 @@
 
     /**
      * Given a field number, find the associated FieldSchema.
-     * @param fieldNum Field number to look up.
+     * 
+     * @param fieldNum
+     *            Field number to look up.
      * @return FieldSchema for this field.
-     * @throws ParseException if the field number exceeds the number of
-     * fields in the tuple.
+     * @throws ParseException
+     *             if the field number exceeds the number of fields in the
+     *             tuple.
      */
     public FieldSchema getField(int fieldNum) throws ParseException {
         if (fieldNum >= mFields.size()) {
-            throw new ParseException("Attempt to fetch field " + fieldNum +
-                " from tuple of size " + mFields.size());
+            throw new ParseException("Attempt to fetch field " + fieldNum
+                    + " from tuple of size " + mFields.size());
         }
 
         return mFields.get(fieldNum);
@@ -171,6 +235,7 @@
 
     /**
      * Find the number of fields in the schema.
+     * 
      * @return number of fields.
      */
     public int size() {
@@ -198,9 +263,39 @@
         for (int j = 0; i.hasNext(); j++) {
             FieldSchema otherFs = i.next();
             FieldSchema ourFs = mFields.get(j);
-            if (otherFs.alias != null) ourFs.alias = otherFs.alias; 
-            if (otherFs.type != DataType.UNKNOWN) ourFs.type = otherFs.type; 
-            if (otherFs.schema != null) ourFs.schema = otherFs.schema; 
+            log.debug("ourFs: " + ourFs + " otherFs: " + otherFs);
+            if (otherFs.alias != null) {
+                log.debug("otherFs.alias: " + otherFs.alias);
+                if (ourFs.alias != null) {
+                    log.debug("Removing ourFs.alias: " + ourFs.alias);
+                    mAliases.remove(ourFs.alias);
+                    Collection<String> aliases = mFieldSchemas.get(ourFs);
+                    List<String> listAliases = new ArrayList<String>();
+                    for(String alias: aliases) {
+                        listAliases.add(new String(alias));
+                    }
+                    for(String alias: listAliases) {
+                        log.debug("Removing alias " + alias + " from multimap");
+                        mFieldSchemas.remove(ourFs, alias);
+                    }
+                }
+                ourFs.alias = otherFs.alias;
+                log.debug("Setting alias to: " + otherFs.alias);
+                mAliases.put(ourFs.alias, ourFs);
+                if(null != ourFs.alias) {
+                    mFieldSchemas.put(ourFs, ourFs.alias);
+                }
+            }
+            if (otherFs.type != DataType.UNKNOWN) {
+                ourFs.type = otherFs.type;
+                log.debug("Setting type to: "
+                        + DataType.findTypeName(otherFs.type));
+            }
+            if (otherFs.schema != null) {
+                ourFs.schema = otherFs.schema;
+                log.debug("Setting schema to: " + otherFs.schema);
+            }
+
         }
     }
 
@@ -219,7 +314,67 @@
         }
         return true;
     }
+
+    // TODO add hashCode()
+
+    public void add(FieldSchema f) {
+        mFields.add(f);
+        if (null != f.alias) {
+            mAliases.put(f.alias, f);
+        }
+    }
+ 
+    /**
+     * Given an alias, find the associated position of the field schema.
+     * 
+     * @param alias
+     *            alias of the FieldSchema.
+     * @return position of the FieldSchema.
+     */
+    public int getPosition(String alias) {
+
+        FieldSchema fs = getField(alias);
+
+        if (null == fs) {
+            return -1;
+        }
+        
+        log.debug("fs: " + fs);
+        int index = -1;
+        for(int i = 0; i < mFields.size(); ++i) {
+            log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + mFields.get(i).alias);
+            if(fs == mFields.get(i)) {index = i;}
+        }
+
+        log.debug("index: " + index);
+        return index;
+        //return mFields.indexOf(fs);
+    }
+
+    public void addAlias(String alias, FieldSchema fs) {
+        if(null != alias) {
+            mAliases.put(alias, fs);
+            if(null != fs) {
+                mFieldSchemas.put(fs, alias);
+            }
+        }
+    }
+
+    public Set<String> getAliases() {
+        return mAliases.keySet();
+    }
+
+    public void printAliases() {
+        Set<String> aliasNames = mAliases.keySet();
+        for (String alias : aliasNames) {
+            log.debug("Schema Alias: " + alias);
+        }
+    }
     
+    public List<FieldSchema> getFields() {
+        return mFields;
+    }
+
     /**
      * Recursively compare two schemas for equality
      * @param schema