You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC

svn commit: r1100420 [12/19] - in /pig/branches/branch-0.9: ./ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apach...

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Sat May  7 00:15:40 2011
@@ -1,4252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * JavaCC file
- * This file lists the grammar for PIG Latin.
- * QueryParser program ouputs a ParseTree given a Valid Pig Latin Query
- */
-options {
-  // Generate non-static functions
-  STATIC = false;
-  // Case is ignored in keywords
-  IGNORE_CASE = true;
-  JAVA_UNICODE_ESCAPE = true;
-}
-
-PARSER_BEGIN(QueryParser)
-package org.apache.pig.impl.logicalLayer.parser;
-import java.io.*;
-import java.util.*;
-import java.net.URI;
-import java.lang.Class;
-import java.net.URISyntaxException;
-import java.lang.reflect.Type;
-import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.logicalLayer.schema.*;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.ExecType;
-import org.apache.pig.impl.io.*;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RANDOM;
-import org.apache.pig.impl.builtin.GFAny;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.util.StringUtils;
-import org.apache.pig.StreamToPig;
-import org.apache.pig.PigToStream;
-import org.apache.pig.builtin.PigStreaming;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.ComparisonFunc;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.impl.util.LinkedMultiMap;
-import org.apache.pig.impl.builtin.ReadScalars;
-
-public class QueryParser {
-	private PigContext pigContext;
-	private Map<LogicalOperator, LogicalPlan> aliases;
-	private Map<OperatorKey, LogicalOperator> opTable;
-	private String scope;
-	private NodeIdGenerator nodeIdGen;
-	//a map of alias to logical operator for a quick lookup
-	private Map<String, LogicalOperator> mapAliasOp;
-	private static Log log = LogFactory.getLog(QueryParser.class);
-	private boolean bracketed = false;
-	private boolean scalarFound = false;
-    private Map<String, String> fileNameMap;
-	
-	private long getNextId() {
-		return nodeIdGen.getNextNodeId(scope);
-	}
-
-	public QueryParser(InputStream in,
-					   PigContext pigContext, 
-					   String scope, 
-					   Map<LogicalOperator, LogicalPlan> aliases,
-					   Map<OperatorKey, LogicalOperator> opTable,
-					   Map<String, LogicalOperator> aliasOp,
-                                           Map<String, String> fileNameMap) {
-		this(in);
-		this.pigContext = pigContext;
-		this.aliases = aliases;
-		this.opTable = opTable;
-		this.scope = scope;
-		this.nodeIdGen = NodeIdGenerator.getGenerator();
-		this.mapAliasOp = aliasOp;
-        this.fileNameMap = fileNameMap;
-	}
-	
-    public QueryParser(InputStream in,
-            PigContext pigContext, 
-            String scope, 
-            Map<LogicalOperator, LogicalPlan> aliases,
-            Map<OperatorKey, LogicalOperator> opTable,
-            Map<String, LogicalOperator> aliasOp,
-            int start,
-            Map<String, String> fileNameMap) {
-        this(in, pigContext, scope, aliases, opTable, aliasOp, fileNameMap);
-        token_source.input_stream.line = start;
-    }
-
-	public class CogroupInput {
-		public LogicalOperator op;
-		public ArrayList<LogicalPlan> plans;
-		public boolean isInner;
-	}
-	    
-    private static String removeQuotes(String str) {
-        if (str.startsWith("\'") && str.endsWith("\'"))
-            return str.substring(1, str.length() - 1);
-        else
-            return str;
-    }
-
-    public static LogicalPlan generateStorePlan(String scope,
-                                                LogicalPlan readFrom,
-                                                String fileName,
-                                                String func,
-                                                LogicalOperator input,
-                                                String alias, PigContext pigContext) throws FrontendException {
-
-        if (func == null) {
-            func = PigStorage.class.getName();
-        }
-
-        fileName = removeQuotes(fileName);
-
-        long storeNodeId = NodeIdGenerator.getGenerator().getNextNodeId(scope);
-
-        LogicalPlan storePlan = new LogicalPlan();
-
-        LogicalOperator store;
-        FuncSpec funcSpec = new FuncSpec(func);
-        FileSpec fileSpec = new FileSpec(fileName, funcSpec);
-        try {
-            store = new LOStore(storePlan, new OperatorKey(scope, storeNodeId),
-                    fileSpec, alias);
-        } catch (IOException ioe) {
-            throw new FrontendException(ioe.getMessage(), ioe);
-        }
-        
-        Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
-        StoreFuncInterface stoFunc = (StoreFuncInterface)obj;
-        stoFunc.setStoreFuncUDFContextSignature(LOStore.constructSignature(alias, fileName, funcSpec));
-
-        try {
-            stoFunc.relToAbsPathForStoreLocation(fileName, getCurrentDir(pigContext));
-        } catch (IOException ioe) {
-            FrontendException e = new FrontendException(ioe.getMessage(), ioe);
-            throw e;
-        }
-        
-        try {
-	        storePlan.add(store);
-	        storePlan.add(input);
-	        storePlan.connect(input, store);
-	        attachPlan(storePlan, input, readFrom, new HashMap<LogicalOperator, Boolean>());
-        } catch (ParseException pe) {
-            throw new FrontendException(pe.getMessage(), pe);
-        }
-	    
-        if (storePlan.getRoots().size() == 0) throw new RuntimeException("Store plan has no roots!");
-        return storePlan;
-    }
-
-    static String unquote(String s) {
-		return StringUtils.unescapeInputString(s.substring(1, s.length()-1)) ;
-	}
-	
-	static int undollar(String s) {
-		return Integer.parseInt(s.substring(1, s.length()));	
-	}
-	    
-    static Path getCurrentDir(PigContext pigContext) throws IOException {
-        DataStorage dfs = pigContext.getDfs();
-        ContainerDescriptor desc = dfs.getActiveContainer();
-        ElementDescriptor el = dfs.asElement(desc);
-        return new Path(el.toString());
-    }
-
-	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
-		
-		log.trace("Entering parseCogroup");
-		log.debug("LogicalPlan: " + lp);
-		
-		int n = gis.size();
-		log.debug("Number of cogroup inputs = " + n);
-		
-		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
-		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
-		MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
-		//Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
-		boolean[] isInner = new boolean[n];
-		
-		int arity = gis.get(0).plans.size();
-		
-		for (int i = 0; i < n ; i++){
-			
-			CogroupInput gi = gis.get(i);
-			los.add(gi.op);
-			ArrayList<LogicalPlan> planList = gi.plans;
-			plans.add(gi.plans);
-			int numGrpByOps = planList.size();
-			log.debug("Number of group by operators = " + numGrpByOps);
-
-			if(arity != numGrpByOps) {
-				throw new ParseException("The arity of the group by columns do not match.");
-			}
-			for(int j = 0; j < numGrpByOps; ++j) {
-			    groupByPlans.put(gi.op, planList.get(j));
-				for(LogicalOperator root: planList.get(j).getRoots()) {
-					log.debug("Cogroup input plan root: " + root);
-				}
-			}
-			isInner[i] = gi.isInner;
-		}
-		
-		LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, type, isInner);
-		lp.add(cogroup);
-		log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
-		
-		for(LogicalOperator op: los) {
-			lp.connect(op, cogroup);
-			log.debug("Connected operator " + op.getClass().getName() + " to " + cogroup.getClass().getName() + " in the logical plan");
-		}
-
-		log.trace("Exiting parseCogroup");
-		return cogroup;
-	}
-	
-    private LogicalOperator parseUsingForGroupBy(String modifier, ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
-
-      if(modifier.equalsIgnoreCase("collected")){
-            if (gis.size() != 1) {
-                throw new ParseException("Collected group is only supported for single input");  
-                }
-            if (!isColumnProjectionsOrStar(gis.get(0))) {
-                throw new ParseException("Collected group is only supported for columns or star projection");
-                }
-            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
-            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
-            return cogroup;
-        }
-
-        else if (modifier.equalsIgnoreCase("regular")){
-            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
-            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
-            return cogroup;
-        }
-
-        else if (modifier.equalsIgnoreCase("merge")){
-            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.MERGE);
-            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
-            return cogroup;
-        }
-        
-        else{
-            throw new ParseException("Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers.");
-        }
-    }
-    
-	/**
-	 * Join parser. 
-	 */
-	LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
-		log.trace("Entering parseJoin");
-        
-		int n = gis.size();
-
-		if (jt == LOJoin.JOINTYPE.SKEWED && n != 2) {
-			throw new ParseException("Skewed join can only be applied for 2-way joins");
-		}
-		if (jt == LOJoin.JOINTYPE.MERGE && n != 2) {
-            throw new ParseException("Merge join can only be applied for 2-way joins");
-        }
-        
-		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
-		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
-		MultiMap<LogicalOperator, LogicalPlan> joinPlans = new LinkedMultiMap<LogicalOperator, LogicalPlan>();
-		boolean[] isInner = new boolean[n];
-		
-		int arity = gis.get(0).plans.size();
-		
-		for (int i = 0; i < n ; i++){
-			
-			CogroupInput gi = gis.get(i);
-			los.add(gi.op);
-			ArrayList<LogicalPlan> planList = gi.plans;
-			plans.add(gi.plans);
-			int numJoinOps = planList.size();
-			log.debug("Number of join operators = " + numJoinOps);
-
-			if(arity != numJoinOps) {
-				throw new ParseException("The arity of the join columns do not match.");
-			}
-			for(int j = 0; j < numJoinOps; ++j) {
-			    joinPlans.put(gi.op, planList.get(j));
-				for(LogicalOperator root: planList.get(j).getRoots()) {
-					log.debug("Join input plan root: " + root);
-				}
-			}
-			isInner[i] = gi.isInner;
-		}
-
-		LogicalOperator loj  = new LOJoin(lp, new OperatorKey(scope, getNextId()), joinPlans, jt, isInner);
-		lp.add(loj);
-		log.debug("Added operator " + loj.getClass().getName() + " object " + loj + " to the logical plan " + lp);
-		
-		for(LogicalOperator op: los) {
-			lp.connect(op, loj);
-			log.debug("Connected operator " + op.getClass().getName() + " to " + loj.getClass().getName() + " in the logical plan");
-		}
-
-		log.trace("Exiting parseJoin");
-		return loj;
-	}
-
-	/**
-	 * The join operator is translated to foreach 
-	 */
-	LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
-		
-		log.trace("Entering rewriteJoin");
-		log.debug("LogicalPlan: " + lp);
-		int n = gis.size();
-		ArrayList<ExpressionOperator> flattenedColumns = new ArrayList<ExpressionOperator>();
-		ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
-		ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-		
-		/*
-		 * Construct the projection operators required for the generate
-		 * Make sure that the operators are flattened
-		 */
-
-
-	
-		//Construct the cogroup operator and add it to the logical plan
-        // for join, inner is true for all the inputs involved in the join
-        for (int i = 0; i < n; i++) {
-			(gis.get(i)).isInner = true;
-        }
-		LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
-		lp.add(cogroup);
-		log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
-		
-		for (int i = 0; i < n; i++) {
-			LogicalPlan projectPlan = new LogicalPlan(); 
-			LogicalOperator projectInput = cogroup;
-			ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, getNextId()), projectInput, i+1);
-			flattenList.add(true);
-			flattenedColumns.add(column);
-			projectPlan.add(column);
-            if(projectInput instanceof ExpressionOperator) {
-			    projectPlan.add(projectInput);
-			    projectPlan.connect(projectInput, column);
-            }
-			log.debug("parseCogroup: Added operator " + column.getClass().getName() + " " + column + " to logical plan " + projectPlan);
-			generatePlans.add(projectPlan);
-		}
-
-		
-		
-		/*
-		 * Construct the foreach operator from the foreach logical plan
-		 * Add the foreach operator to the top level logical plan
-		 */
-		LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
-		lp.add(foreach);
-		log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
-		lp.connect(cogroup, foreach);
-		log.debug("Connected operator " + cogroup.getClass().getName() + " to opeator " + foreach.getClass().getName() + " in the logical plan " + lp);
-		
-		log.trace("Exiting rewriteJoin");
-		return foreach;
-	}
-
-    private LogicalOperator parseUsingForJoin(String modifier, ArrayList<CogroupInput> gis,
-                LogicalPlan lp, boolean isFullOuter, boolean isRightOuter, boolean isOuter) throws
-                ParseException, PlanException{
-
-              if (modifier.equalsIgnoreCase("repl") || modifier.equalsIgnoreCase("replicated")) {
-              if(isFullOuter || isRightOuter) {
-                  throw new ParseException("Replicated join does not support (right|full) outer joins");
-              }
-                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); 
-                    joinOp.pinOption(LOJoin.OPTION_JOIN);
-                    return joinOp; 
-            }
-             else if (modifier.equalsIgnoreCase("hash") || modifier.equalsIgnoreCase("default")) {
-                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-                    joinOp.pinOption(LOJoin.OPTION_JOIN);
-                    return joinOp;
-            }
-            else if (modifier.equalsIgnoreCase("skewed")) {
-                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
-                    joinOp.pinOption(LOJoin.OPTION_JOIN);
-                    return joinOp;
-            }
-             else if (modifier.equalsIgnoreCase("merge")) {
-                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
-                    joinOp.pinOption(LOJoin.OPTION_JOIN);
-                    return joinOp; 
-            }
-            else{
-                    throw new ParseException("Only REPL, REPLICATED, HASH, SKEWED and MERGE are vaild JOIN modifiers.");
-            }
-    }
-
-    void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
-		Boolean isAtomic = null;
-		if ( spec instanceof LOConst || 
-			(spec instanceof LOUserFunc &&
-                DataType.isAtomic(((LOUserFunc)spec).getType())))
-			isAtomic = true;
-		else if (spec instanceof LOUserFunc)
-			isAtomic = false;
-		
-		if (isAtomic != null && isAtomic != desiredAtomic){
-			if (desiredAtomic)
-				throw new ParseException("Atomic field expected but found non-atomic field");
-			else
-				throw new ParseException("Non-atomic field expected but found atomic field");
-		}
-	}					
-
-	 void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
-		LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan);
-		splitOp.addOutput(splitOutput);
-		addAlias(alias, splitOutput);
-        splitOutput.setAlias(alias);
-        addLogicalPlan(splitOutput, lp);
-		
-		lp.add(splitOutput);
-		log.debug("Added alias: " + splitOutput.getAlias() + " class: " 
-			+ splitOutput.getClass().getName() + " to the logical plan");
-			
-		lp.connect(splitOp, splitOutput);
-		log.debug("Connected " + splitOp.getClass().getName() + " to class: "
-			+ splitOutput.getClass().getName() + " in the logical plan");
-		
-	 }
-	 
-	 void addAlias(String alias, LogicalOperator lOp) {
-	 	mapAliasOp.put(alias, lOp);
-	 }
-	 
-	 LogicalOperator getOp(String alias) {
-	 	return mapAliasOp.get(alias);
-	 }
-	 
-	 Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
-	     String HAR_PREFIX = "hdfs-";
-	     Set<String> result = new HashSet<String>();
-	     String[] fnames = absolutePath.split(",");
-	     for (String fname: fnames) {
-	         // remove leading/trailing whitespace(s)
-	         fname = fname.trim();
-	         Path p = new Path(fname);
-	         URI uri = p.toUri();
-	         if(uri.isAbsolute()) {
-	             String scheme = uri.getScheme();
-	             if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
-	                 if (uri.getHost()==null)
-	                     continue;
-	                 String thisHost = uri.getHost().toLowerCase();
-	                 if (scheme.toLowerCase().equals("har")) {
-	                     if (thisHost.startsWith(HAR_PREFIX)) {
-	                         thisHost = thisHost.substring(HAR_PREFIX.length());
-	                     }
-	                 }
-	                 if (!uri.getHost().isEmpty() && 
-	                         !thisHost.equals(defaultHost)) {
-	                     if (uri.getPort()!=-1)
-	                         result.add("hdfs://"+thisHost+":"+uri.getPort());
-	                     else
-	                         result.add("hdfs://"+thisHost);
-	                 }
-	             }
-	         }
-	     }
-	     return result;
-	 }
-
-    void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
-        // Get native host
-        String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
-        URI defaultFSURI = new URI(defaultFS);
-        String defaultHost = defaultFSURI.getHost();
-        if (defaultHost == null) defaultHost = "";
-                
-        defaultHost = defaultHost.toLowerCase();
-    
-        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-                    
-        String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
-        if (hdfsServersString == null) hdfsServersString = "";
-        String hdfsServers[] = hdfsServersString.split(",");
-                    
-        for (String remoteHost : remoteHosts) {
-            boolean existing = false;
-            for (String hdfsServer : hdfsServers) {
-                if (hdfsServer.equals(remoteHost)) {
-                    existing = true;
-                }
-            }
-            if (!existing) {
-                if (!hdfsServersString.isEmpty()) {
-                    hdfsServersString = hdfsServersString + ",";
-                }
-                hdfsServersString = hdfsServersString + remoteHost;
-            }
-        }
-    
-        if (!hdfsServersString.isEmpty()) {
-            pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
-        }
-    }
-    
-     // Check and set files to be automatically shipped for the given StreamingCommand
-     // Auto-shipping rules:
-     // 1. If the command begins with either perl or python assume that the 
-     //    binary is the first non-quoted string it encounters that does not 
-     //    start with dash - subject to restrictions in (2).
-     // 2. Otherwise, attempt to ship the first string from the command line as 
-     //    long as it does not come from /bin, /user/bin, /user/local/bin. 
-     //    It will determine that by scanning the path if an absolute path is 
-     //    provided or by executing "which". The paths can be made configurable 
-     //    via "set stream.skippath <paths>" option.
-     private static final String PERL = "perl";
-     private static final String PYTHON = "python";
-     private void checkAutoShipSpecs(StreamingCommand command, String[] argv) 
-     throws ParseException {
-     	// Candidate for auto-ship
-     	String arg0 = argv[0];
-     	
-     	// Check if command is perl or python ... if so use the first non-option
-     	// and non-quoted string as the candidate
-        if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
-            for (int i=1; i < argv.length; ++i) {
-            	if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
-            		checkAndShip(command, argv[i]);
-            		break;
-            	}
-            }
-        } else {
-        	// Ship the first argument if it can be ...
-        	checkAndShip(command, arg0);
-        }
-     }
-     
-     private void checkAndShip(StreamingCommand command, String arg) 
-     throws ParseException {
-     	// Don't auto-ship if it is an absolute path...
-     	if (arg.startsWith("/")) {
-     		return;
-     	}
-     	
-     	// $ which arg
-     	String argPath = which(arg);
-     	if (argPath != null && !inSkipPaths(argPath)) {
-     		try {
-     		    command.addPathToShip(argPath);
-     		} catch(IOException e) {
-                ParseException pe = new ParseException(e.getMessage());
-                pe.initCause(e);
-                throw pe;
-            }
-     	}
-     	 
-     }
-
-     private boolean isQuotedString(String s) {
-     	return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
-     }
-     
-     // Check if file is in the list paths to be skipped 
-     private boolean inSkipPaths(String file) {
-     	for (String skipPath : pigContext.getPathsToSkip()) {
-     		if (file.startsWith(skipPath)) {
-     			return true;
-     		}
-     	}
-        return false;
-     }
-
-
-     private static String which(String file) {
-        try {
-        	ProcessBuilder processBuilder = 
-        	    new ProcessBuilder(new String[] {"which", file});
-            Process process = processBuilder.start();
-    
-            BufferedReader stdout = 
-                new BufferedReader(new InputStreamReader(process.getInputStream()));
-            String fullPath = stdout.readLine();
-
-            return (process.waitFor() == 0) ? fullPath : null;
-        } catch (Exception e) {}
-        return null;
-     }
-               
-     private static final char SINGLE_QUOTE = '\'';
-     private static final char DOUBLE_QUOTE = '"';
-     private static String[] splitArgs(String command) throws ParseException {
-        List<String> argv = new ArrayList<String>();
-
-        int beginIndex = 0;
-        
-        while (beginIndex < command.length()) {
-            // Skip spaces
-            while (Character.isWhitespace(command.charAt(beginIndex))) {
-                ++beginIndex;
-            }
-            
-            char delim = ' ';
-            char charAtIndex = command.charAt(beginIndex);
-            if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
-                delim = charAtIndex;
-            }
-            
-            int endIndex = command.indexOf(delim, beginIndex+1);
-            if (endIndex == -1) {
-                if (Character.isWhitespace(delim)) {
-                    // Reached end of command-line
-                    argv.add(command.substring(beginIndex));
-                    break;
-                } else {
-                    // Didn't find the ending quote/double-quote
-                    throw new ParseException("Illegal command: " + command);
-                }
-            }
-            
-            if (Character.isWhitespace(delim)) {
-                // Do not consume the space
-                argv.add(command.substring(beginIndex, endIndex));
-            } else {
-                argv.add(command.substring(beginIndex, endIndex+1));
-            }
-           
-            beginIndex = endIndex + 1;
-        }
-        
-        return argv.toArray(new String[argv.size()]);
-    }
-	 
-	 //BEGIN
-	 //I am maintaining state about the operators that should
-	 //serve as the inputs to generate in the foreach logical
-	 //plan. I did not want to pass this structure around for
-	 //the entire parse tree
-
-	 private boolean insideGenerate = false; //to check if we are parsing inside a generate statement
-	 private List<LogicalOperator> generateInputs = new ArrayList<LogicalOperator>();
-
-	boolean insideGenerate() {
-		return insideGenerate;
-	}
-
-	void setInsideGenerate(boolean b) {
-		insideGenerate = b;
-	}
-
-	List<LogicalOperator> getGenerateInputs() {
-	 	return generateInputs;
-	}
-
-	void resetGenerateInputs() {
-		generateInputs.clear();
-	}
-
-	void addGenerateInput(LogicalOperator op) {
-		generateInputs.add(op);
-	}
-
-	void resetGenerateState() {
-		insideGenerate = false;
-		resetGenerateInputs();
-	}
-
-    boolean checkGenerateInput(LogicalOperator in) {
-        if(null == generateInputs) return false;
-        for(LogicalOperator op: generateInputs) {
-            if(op == in) return true;
-        }
-        return false;
-    }
-
-	 //END
-	
-	private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
-
-    public void addLogicalPlan(LogicalOperator op, LogicalPlan plan) {
-        aliases.put(op, plan);
-    }
-
-    public LogicalPlan getLogicalPlan(LogicalOperator op) {
-        return aliases.get(op);
-    }
-
-    public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan, Map<LogicalOperator, Boolean> rootProcessed) throws ParseException {
-        log.trace("Entering attachPlan");
-        if(null == rootProcessed) {
-            rootProcessed = new HashMap<LogicalOperator, Boolean>();
-        }
-        if((rootProcessed.get(root) != null) && (rootProcessed.get(root))) {
-            log.trace("Root has been processed");
-            log.trace("Exiting attachPlan");
-            return;
-        }
-        lp.add(root);
-        log.debug("Added operator " + root + " to the logical plan " + lp);
-        if(null == rootPlan.getPredecessors(root)) {
-            log.trace("Exiting attachPlan");
-            return;
-        }
-        for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
-            attachPlan(lp, rootPred, rootPlan, rootProcessed);
-            rootProcessed.put(rootPred, true);
-            try {
-                lp.connect(rootPred, root);
-                log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
-            } catch (FrontendException fee) {
-                ParseException pe = new ParseException(fee.getMessage());
-                pe.initCause(fee); 
-                throw pe;
-            }
-        }
-        log.trace("Exiting attachPlan");
-    }
-
-    boolean isColumnProjectionsOrStar(CogroupInput cgi) {
-        if (cgi == null || cgi.plans == null || cgi.plans.size() == 0) {
-            return false;
-        }
-        for (LogicalPlan keyPlan: cgi.plans) {
-            for (LogicalOperator op : keyPlan) {
-                if(!(op instanceof LOProject)) {
-                    return false;
-                }
-            }
-        }
-        return true;    
-    }
-    
-    static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
-        return fileName+"_"+funcSpec.toString();
-    }
-    
-    ExpressionOperator attachColPosToReadScalar(LogicalPlan lp, 
-            ExpressionOperator expr, int colNum, Schema over) 
-    throws PlanException, FrontendException{
-    
-        scalarFound = false;
-        // We also need to attach LOConst to the userfunc 
-        // so that it can read that projection number in ReadScalars UDF
-        LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum);
-        rconst.setType(DataType.INTEGER);
-        lp.add(rconst);
-        lp.connect(rconst, expr);
-
-//        if(over != null && over.getField(colNum).type != DataType.BYTEARRAY) {
-//            LOCast loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type);
-//            lp.add(loCast);
-//            lp.connect(expr, loCast);
-//            expr = loCast;
-//        } 
-        return expr;
-    }    
-}
-
-
-
-class FunctionType {
-    public static final byte UNKNOWNFUNC = 0;
-    public static final byte EVALFUNC = 2;
-    public static final byte COMPARISONFUNC = 4;
-    public static final byte LOADFUNC = 8; 
-    public static final byte STOREFUNC = 16;
-    public static final byte PIGTOSTREAMFUNC = 32;
-    public static final byte STREAMTOPIGFUNC = 64;
-
-    public static void tryCasting(Object func, byte funcType) throws Exception {
-        switch(funcType) {
-        case FunctionType.EVALFUNC:
-			EvalFunc evalFunc = (EvalFunc) func;
-            break;
-        case FunctionType.COMPARISONFUNC:
-			ComparisonFunc comparisonFunc = (ComparisonFunc) func;
-            break;
-        case FunctionType.LOADFUNC:
-			LoadFunc loadFunc = (LoadFunc) func;
-            break;
-        case FunctionType.STOREFUNC:
-			StoreFuncInterface storeFunc = (StoreFuncInterface) func;
-            break;
-        case FunctionType.PIGTOSTREAMFUNC:
-            PigToStream ptsFunc = (PigToStream) func;
-            break;
-        case FunctionType.STREAMTOPIGFUNC:
-            StreamToPig stpFunc = (StreamToPig) func;
-            break;
-        default:
-            throw new Exception("Received an unknown function type: " + funcType);
-        }
-    }
-};
-
-class ClassType {
-	public static final byte UNKNOWNCLASS = 0;
-	public static final byte PARTITIONER = 2;
-	
-	public static void checkClassType(Class cs, byte classType) throws Exception {
-		switch(classType) {
-		case ClassType.PARTITIONER:
-			if(!(cs.newInstance() instanceof Partitioner)) {
-				throw new Exception("Not a class of org.apache.hadoop.mapreduce.Partitioner");
-			}
-			break;
-		default:
-			throw new Exception("Received an unknown class type: " + classType);
-		}
-	}
-}
-
-PARSER_END(QueryParser)
-
-// Skip all the new lines, tabs and spaces
-SKIP : { " " |	"\r" |	"\t" |	"\n" }
-
-// Skip comments(single line and multiline)
-SKIP : {
-   <"--"(~["\r","\n"])*>
-|  <"/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
-}
-// Every readable token added to following list before identifier needs to be added in
-// IdentifierOrReserved function so that package names will allow these keywords
-// Also, add it to TestLogicalPlanBuilder.testReservedWordsInFunctionNames
-
-// Comparison operators that can be used in a filter:
-TOKEN : { <#STRFILTEROP : "eq" | "gt" | "lt" | "gte" | "lte" | "neq" > }
-TOKEN : { <#NUMFILTEROP : "==" | "<" | "<=" | ">" | ">=" | "!=" > }
-TOKEN : { <FILTEROP : <STRFILTEROP> | <NUMFILTEROP>  > }
-
-// List all the keywords in the language
-TOKEN : { <DEFINE : "define"> }
-TOKEN : { <LOAD : "load"> }
-TOKEN : { <FILTER : "filter"> }
-TOKEN : { <FOREACH : "foreach"> }
-TOKEN : { <MATCHES : "matches"> }
-TOKEN : { <ORDER : "order"> }
-TOKEN : { <ARRANGE : "arrange"> }
-TOKEN : { <DISTINCT : "distinct"> }
-TOKEN : { <COGROUP : "cogroup"> }
-TOKEN : { <JOIN : "join"> }
-TOKEN : { <CROSS : "cross"> }
-TOKEN : { <UNION : "union"> }
-TOKEN : { <SPLIT : "split"> }
-TOKEN : { <INTO : "into"> }
-TOKEN : { <IF : "if"> }
-TOKEN : { <ALL : "all"> }
-TOKEN : { <ANY : "any"> }
-TOKEN : { <AS : "as">	}
-TOKEN : { <BY : "by">	}
-TOKEN : { <USING : "using"> }
-TOKEN : { <INNER : "inner"> }
-TOKEN : { <OUTER : "outer"> }
-TOKEN : { <ONSCHEMA : "ONSCHEMA"> }
-TOKEN : { <STAR : "*"> 		}
-TOKEN : { <PARALLEL : "parallel"> }
-TOKEN : { <PARTITION : "partition"> }
-TOKEN : { <GROUP : "group"> }
-TOKEN : { <AND : "and"> }
-TOKEN : { <OR : "or"> }
-TOKEN : { <NOT : "not"> }
-TOKEN : { <GENERATE : "generate"> }
-TOKEN : { <FLATTEN : "flatten"> }
-TOKEN : { <EVAL : "eval"> }
-TOKEN : { <ASC : "asc"> }
-TOKEN : { <DESC : "desc"> }
-TOKEN : { <INT : "int"> }
-TOKEN : { <LONG : "long"> }
-TOKEN : { <FLOAT : "float"> }
-TOKEN : { <DOUBLE : "double"> }
-TOKEN : { <CHARARRAY : "chararray"> }
-TOKEN : { <BYTEARRAY : "bytearray"> }
-TOKEN : { <BAG : "bag"> }
-TOKEN : { <TUPLE : "tuple"> }
-TOKEN : { <MAP : "map"> }
-TOKEN : { <IS : "is"> }
-TOKEN : { <NULL : "null"> }
-TOKEN : { <STREAM : "stream"> }
-TOKEN : { <THROUGH : "through"> }
-TOKEN : { <STORE : "store"> }
-TOKEN : { <MAPREDUCE: "mapreduce">}
-TOKEN : { <SHIP: "ship"> }
-TOKEN : { <CACHE: "cache"> }
-TOKEN : { <INPUT: "input"> }
-TOKEN : { <OUTPUT: "output"> }
-TOKEN : { <ERROR: "stderr"> }
-TOKEN : { <STDIN: "stdin"> }
-TOKEN : { <STDOUT: "stdout"> }
-TOKEN : { <LIMIT: "limit"> }
-TOKEN : { <SAMPLE: "sample"> }
-TOKEN : { <LEFT: "left"> }
-TOKEN : { <RIGHT: "right"> }
-TOKEN : { <FULL: "full"> }
-
-TOKEN:
-{
- 	<#LETTER : ["a"-"z", "A"-"Z"] >
-|	<#DIGIT : ["0"-"9"] >
-|   <#SPECIALCHAR : ["_"] >
-|   <#FSSPECIALCHAR: ["-", ":", "/"]>
-|	<IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> | "::")* >
-}
-// Define Numeric Constants
-TOKEN :
-{
-//	< NUMBER: <INTEGER> | <LONGINTEGER> | <DOUBLENUMBER> | <FLOATNUMBER> >
- 	< #FLOATINGPOINT: <INTEGER> ( "." <INTEGER> )? | "." <INTEGER> >
-| 	< INTEGER: ( <DIGIT> )+ >
-| 	< LONGINTEGER: <INTEGER> (["l","L"])? >
-|   < DOUBLENUMBER: <FLOATINGPOINT> ( ["e","E"] ([ "-","+"])? <FLOATINGPOINT> )?>
-|   < FLOATNUMBER: <DOUBLENUMBER> (["f","F"])? >
-}
-
-TOKEN : { <QUOTEDSTRING :  "'"
-(   (~["'","\\","\n","\r"])
-  | ("\\"
-      ( ["n","t","b","r","f","\\","'"] )
-    )
-  | ("\\u"
-        ["0"-"9","A"-"F","a"-"f"]
-        ["0"-"9","A"-"F","a"-"f"]
-        ["0"-"9","A"-"F","a"-"f"]
-        ["0"-"9","A"-"F","a"-"f"]
-    )
-)*
-"'"> }
-
-
-TOKEN:{ < QUOTED_MULTI_STRING :  
-"'" (  (~["'","\\","\n","\r"])
-       | ("\\"  ["n","t","b","r","f","\\","'"] )
-       | ("\\u" ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"]
-                ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"])
-    )* 
-    ["\n","\r"] 
-    (  (~["'","\\"] )
-       | ("\\"  ["n","t","b","r","f","\\","'"] )
-       | ("\\u" ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"]
-                ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"])
-    )*
-"'"> 
-}
-
-
-TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
-// Pig has special variables starting with $
-TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
-
-// Parse is the Starting function.
-LogicalPlan Parse() : 
-{
-	LogicalOperator root = null; 
-	Token t1;
-	Token t2; 
-	LogicalPlan lp = new LogicalPlan();
-	log.trace("Entering Parse");
-}
-{
-	(
-	LOOKAHEAD(3)
-	// For now don't allow A = B; kind of statements - this should
-	// be fixed and allowed in the future
-	(t1 = <IDENTIFIER> "=" t2 = <IDENTIFIER> [ <AS> "(" TupleSchema() ")" ] ";" { 
-			throw new ParseException(
-			"Currently PIG does not support assigning an existing relation (" + t1.image + ") to another alias (" + t2.image + ")");})
-|	LOOKAHEAD(2) 
-	(t1 = <IDENTIFIER> "=" root = Expr(lp) ";" {
-	  root.setAlias(t1.image);
-	  addAlias(t1.image, root);
-	  pigContext.setLastAlias(t1.image);
-	})
-|	(root = Expr(lp) ";")
-|	(<SPLIT> root = SplitClause(lp) ";")
-	)
-	{ 
-		if(null != root) {
-            try {
-                log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-    
-                //Translate all the project(*) leaves in the plan to a sequence of projections
-                ProjectStarTranslator translate = new ProjectStarTranslator(lp);
-                translate.visit();
-    
-                addLogicalPlan(root, lp);
-            
-                log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
-            } catch(FrontendException fee) {
-                ParseException pe = new ParseException(fee.getMessage());
-                pe.initCause(fee);  
-                throw pe;
-            }
-		}
-
-        ArrayList<LogicalOperator> roots = new ArrayList<LogicalOperator>(lp.getRoots().size());
-        for(LogicalOperator op: lp.getRoots()) {
-            roots.add(op);
-        }
-        
-        Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
-        for(LogicalOperator op: roots) {
-            //At this point we have a logical plan for the pig statement
-            //In order to construct the entire logical plan we need to traverse
-            //each root and get the logical plan it belongs to. From each of those
-            //plans we need the predecessors of the root of the current logical plan
-            //and so on. This is a computationally intensive operatton but should
-            //be fine as its restricted to the parser
-
-            LogicalPlan rootPlan = aliases.get(op);
-            if(null != rootPlan) {
-                attachPlan(lp, op, rootPlan, rootProcessed);
-                rootProcessed.put(op, true);
-            }
-        }
-		
-		log.trace("Exiting Parse");
-		return lp; 
-	}
-}
-
-LogicalOperator SplitClause(LogicalPlan lp):
-{
-	LogicalOperator input; 
-	ExpressionOperator cond; 
-	Token alias; 
-	LOSplit splitOp; 
-	int index = 0; 
-	LogicalPlan condPlan; 
-	log.trace("Entering SplitClause");
-}
-{
-	(
-	input = NestedExpr(lp) <INTO> 
-	{
-		splitOp = new LOSplit(lp, input.getOperatorKey(), new ArrayList<LogicalOperator>());
-		lp.add(splitOp);
-		log.debug("Adding operator " + splitOp.getClass().getName() + " to the logical plan");		
-        lp.connect(input, splitOp);
-		log.debug("Connected alias: " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + splitOp.getClass().getName());
-	}
-	alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input) 
-	{
-		addSplitOutput(lp, splitOp, alias.image, condPlan, index);
-		++index;
-		log.debug("Added splitoutput");
-	}
-	(
-	"," alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
-	{
-		addSplitOutput(lp, splitOp, alias.image, condPlan, index);
-		++index;
-		log.debug("Added splitoutput");
-	}
-	)+
-	)
-	{log.trace("Exiting SplitClause"); return splitOp;}
-} 
-
-LogicalOperator Expr(LogicalPlan lp) : 
-{
-	LogicalOperator op; 
-	Schema schema = null; 
-	log.trace("Entering Expr");
-}
-{
-	(
-	( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY); op.setSchema(schema);} ] )
-|	op = BaseExpr(lp)
-	)
-	{log.trace("Exiting Expr"); return op;}
-}	
-
-LogicalOperator NestedExpr(LogicalPlan lp) : 
-{
-	LogicalOperator op; 
-	ExpressionOperator eOp;
-	Map<String, LogicalOperator> specs = null; 
-	log.trace("Entering NestedExpr");
-}
-{
-	(
-	(op = Alias(lp))
-|	LOOKAHEAD(2) ( "(" op = NestedExpr(lp) ")" )
-|	( "(" op = BaseExpr(lp) ")" )
-	)
-	{log.trace("Exiting NestedExpr"); return op;}
-}
-
-// A keyword or an identifier
-
-Token IdentifierOrReserved() :
-{
-  Token t1; 
-  log.trace("Entering IdentifierOrReserved");
-}
-{
-  (
-  (t1 = <FILTEROP> )
-| (t1 = <DEFINE> )
-| (t1 = <LOAD> )
-| (t1 =<FILTER> )
-| (t1 =<FOREACH> )
-| (t1 =<MATCHES> )
-| (t1 =<ORDER> )
-| (t1 =<ARRANGE> )
-| (t1 =<DISTINCT> )
-| (t1 =<COGROUP> )
-| (t1 =<JOIN> )
-| (t1 =<CROSS> )
-| (t1 =<UNION> )
-| (t1 =<SPLIT> )
-| (t1 =<INTO> )
-| (t1 =<IF> )
-| (t1 =<ALL> )
-| (t1 =<ANY> )
-| (t1 =<AS> )
-| (t1 =<BY> )
-| (t1 =<USING> )
-| (t1 =<INNER> )
-| (t1 =<OUTER> )
-| (t1 =<PARALLEL> )
-| (t1 =<PARTITION>)
-| (t1 =<GROUP> )
-| (t1 =<AND> )
-| (t1 =<OR> )
-| (t1 =<NOT> )
-| (t1 =<GENERATE> )
-| (t1 =<FLATTEN> )
-| (t1 =<EVAL> )
-| (t1 =<ASC> )
-| (t1 =<DESC> )
-| (t1 =<INT> )
-| (t1 =<LONG> )
-| (t1 =<FLOAT> )
-| (t1 =<DOUBLE> )
-| (t1 =<CHARARRAY> )
-| (t1 =<BYTEARRAY> )
-| (t1 =<BAG> )
-| (t1 =<TUPLE> )
-| (t1 =<MAP> )
-| (t1 =<IS> )
-| (t1 =<NULL> )
-| (t1 =<STREAM> )
-| (t1 =<THROUGH> )
-| (t1 =<STORE> )
-| (t1 =<MAPREDUCE>)
-| (t1 =<SHIP> )
-| (t1 =<CACHE> )
-| (t1 =<INPUT> )
-| (t1 =<OUTPUT> )
-| (t1 =<ERROR> )
-| (t1 =<STDIN> )
-| (t1 =<STDOUT> )
-| (t1 =<LIMIT> )
-| (t1 =<SAMPLE> )
-| (t1 =<LEFT>)
-| (t1 =<RIGHT>)
-| (t1 =<FULL>)
-| (t1 =<IDENTIFIER>)
-)
-    {
-      return t1;
-    }
-}
-
-// A reference to an alias
-LogicalOperator Alias(LogicalPlan lp) : 
-{
-	Token t1; 
-	LogicalOperator op; 
-	log.trace("Entering Alias");
-}
-{
-	t1 = <IDENTIFIER> 
-	{
-		LogicalOperator aliasOp;
-		String alias = t1.image;
-		
-		aliasOp = getOp(alias);
-		if (aliasOp == null) {
-			throw new ParseException("Unrecognized alias " + alias);
-		}
-		addAlias(alias, aliasOp);
-		log.debug("Added " + alias + " to aliasOp");
-		
-		lp.add(aliasOp);
-		log.debug("Added operator: " + aliasOp.getClass().getName() + " to the logical plan " + lp);
-		log.trace("Exiting Alias");
-		return aliasOp;
-	}
-}
-
-LogicalOperator BaseExpr(LogicalPlan lp) : 
-{
-	LogicalOperator op; 
-	Schema schema; 
-	Token t1, t2; 
-	Schema.FieldSchema fs; 
-	log.trace("Entering BaseExpr");
-	String partitioner = null;
-}
-{
-	(
-	(
-    (<DEFINE> op = DefineClause(lp))
-|	(<LOAD> op = LoadClause(lp) 
-		[ <AS> 
-        (
-            LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
-            {
-                Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
-                op.setSchema(schema); 
-                log.debug("Load as schema" + schema);
-            } 
-        |   fs = AtomSchema() 
-            {
-                schema = new Schema(fs); 
-                op.setSchema(schema); 
-                log.debug("Load as atomschema" + schema);
-            }
-        ) 
-        ]
-    )
-|	((<GROUP> | <COGROUP>) op = CogroupClause(lp))
-|	(<FILTER> op = FilterClause(lp))
-|   (<LIMIT> op = LimitClause(lp))
-|   (<SAMPLE> op = SampleClause(lp))
-|   (<ORDER> op = OrderClause(lp))
-|	(<DISTINCT> op = NestedExpr(lp) 
-	([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
-	{
-		LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId())); 
-		lp.add(distinct);
-		distinct.setCustomPartitioner(partitioner);
-		log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan"); 
-		lp.connect(op, distinct);
-		log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
-        op = distinct;
-	})
-|	(<CROSS> op = CrossClause(lp))
-|   (<JOIN> op = JoinClause(lp))
-|	(<UNION> op = UnionClause(lp))
-|	(<FOREACH> op = ForEachClause(lp))
-|   (<STREAM> op = StreamClause(lp) 
-        [ <AS> 
-        (
-            LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
-            {
-                Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
-                op.setSchema(schema); 
-                log.debug("Stream as schema()"+ schema);
-            } 
-        | fs = AtomSchema() 
-            {
-                schema = new Schema(fs);
-                op.setSchema(schema);
-                log.debug("Stream as atomschema()" + schema);
-            }
-        ) 
-        ]
-    )
-|   (<STORE> op = StoreClause(lp))
-{
-    	String inputAlias = ((LOStore)op).getAlias();
-    	LogicalOperator input = mapAliasOp.get(inputAlias);
-        if (input == null)
-            throw new ParseException("Unable to find alias " + inputAlias);
-
-        lp.add(input);
-        lp.connect(input, op);
-}
-|	(<MAPREDUCE> op = MapReduceClause(lp))
-	)
-    [<PARALLEL> t2=<INTEGER> {
-      // In Local Mode we can only use one reducer
-    	if( this.pigContext.getExecType() == ExecType.LOCAL ) {
-    		op.setRequestedParallelism(1);
-    	} else {
-    		op.setRequestedParallelism(Integer.parseInt(t2.image));
-    	}
-    } ]
-	)	
-	{log.trace("Exiting BaseExpr"); return op;}
-}
-
-LogicalOperator LoadClause(LogicalPlan lp) : 
-{
-	Token t1, t2, t3; 
-	String filename; 
-	String funcName,funcArgs =null;
-	FuncSpec funcSpec = null;
-	String funcSpecAsString = null; 
-	LOLoad lo=null; 
-    String splitBy;
-    boolean splittable = true;
-	log.trace("Entering LoadClause");
-}
-{
-	(	filename = FileName()
-		(
-        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.LOADFUNC)
-		)?
-	)
-	{
-		if (funcSpec == null){
-			funcSpecAsString = PigStorage.class.getName();
-			funcSpec = new FuncSpec(funcSpecAsString);
-			log.debug("LoadClause: funcSpec = " + funcSpec);
-		}
-
-        Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
-        LoadFunc loFunc = (LoadFunc)obj;
-        try {
-        
-            // If we converted the file name before, we return the old
-            // result. This is not done for performance but to make sure
-            // we return the same result for relative paths when
-            // re-parsing the same script for execution.
-            // For example if a script has
-            // a = load ...
-            // ...
-            // cd x
-            // ...
-            // store c into 'foo'; => the abs. path for 'foo' would be '/user/<username>/x/foo'
-            // ..
-            // cd y
-            // ..
-            // store f into 'bar'=> the abs. path for 'bar' would be '/user/<username>/x/y/bar'
-            // While re-parsing, the current working dir is already at /user/<username>/x/y
-            // so translating 'foo' to its absolute path based on that dir would give incorrect
-            // results - hence we store the translations into a map during the first parse for
-            // use during the reparse.
-            String absolutePath = fileNameMap.get(constructFileNameSignature(filename, funcSpec));
-            if (absolutePath == null) {
-                absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext));
-                
-                if (absolutePath!=null) {
-                    setHdfsServers(absolutePath, pigContext);
-                }
-                fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath);
-            }
-            lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec),
-                  ConfigurationUtil.toConfiguration(pigContext.getProperties()));
-        } catch (IOException ioe) {
-            // The autogenerated parser code only catches RuntimeException and
-            // ParseException as special Exceptions. All others are caught as
-            // Throwable and then re-thrown by casting to ERROR - this can result
-            // in ClassCastException if it is due to the IOException here - so
-            // wrap the IOException in an "Error" object and throw the Error here
-            Error e = new Error(ioe);
-            throw e;
-        }
-		lp.add(lo);
-		log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");	
-		
-		log.trace("Exiting LoadClause");
-		return lo;
-	} 
-}    
-
-String StringList() : 
-{
-	StringBuilder sb = new StringBuilder(); 
-	Token t;
-}
-{
-	(
-	(
-	t = <QUOTEDSTRING> {sb.append(StringUtils.unescapeInputString(t.image));}
-	( "," t = <QUOTEDSTRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} )*
-	)
-	| {}
-	)
-	{log.debug("StringList: " + sb.toString()); return sb.toString();}
-}
-
-String FunctionArgs() : 
-{
-       StringBuilder sb = new StringBuilder(); 
-       Token t;
-}
-{
-(
-       (
-         ( ( t = <QUOTED_MULTI_STRING> {sb.append(StringUtils.unescapeInputString(t.image));} )
-           | 
-           ( t = <QUOTEDSTRING> {sb.append(StringUtils.unescapeInputString(t.image));})
-         )
-         ( "," 
-           (
-            (t = <QUOTED_MULTI_STRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} ) 
-            | 
-            (t = <QUOTEDSTRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} ) 
-           ) 
-         )*
-       )
-       | {}
-)
-       {log.debug("FuncArgs: " + sb.toString()); return sb.toString();}
-}
-
-
-
-//B = native ('mymr.jar' [, 'other.jar' ...]) A store into 'storeLocation' using storeFunc load 'loadLocation' using loadFunc ['params'];
-LogicalOperator MapReduceClause(LogicalPlan lp) : 
-{
-    	LogicalOperator  loLoad;
-	LogicalOperator loStore;
-	LONative loNative;
-	Schema schema;
-	Schema.FieldSchema fs;
-	String nativeMRJar;
-	Token t1, t2;
-	String[] paths;
-	String[] params = null;
-}
-{
-	t1 = <QUOTEDSTRING> 
-	{
-		nativeMRJar = unquote(t1.image);
-		pigContext.addJar(nativeMRJar);
-	}
-	[ "("
-	  paths = PathList()
-	  {
-		  for(String path: paths) {
-			  pigContext.addJar(path);
-		  }
-	  }
-	  ")"
-	]
-	
-	<STORE>
-	loStore = StoreClause(lp)
-	{
-	    ((LOStore)loStore).setTmpStore(true);
-	    String inputAlias = ((LOStore)loStore).getAlias();
-	    LogicalOperator input = mapAliasOp.get(inputAlias);
-	    if (input == null)
-	       throw new ParseException("Unable to find alias " + inputAlias);
-	    lp.add(input);
-	    lp.connect(input, loStore);
-	    
-	}
-	<LOAD>
-	loLoad = LoadClause(lp)
-	[ <AS> 
-        (
-            LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
-            {
-                Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY); 
-                loLoad.setSchema(schema); 
-                log.debug("Load as schema" + schema);
-            } 
-        |   fs = AtomSchema() 
-            {
-                schema = new Schema(fs); 
-                loLoad.setSchema(schema); 
-                log.debug("Load as atomschema" + schema);
-            }
-        ) 
-    ]
-	[
-	 t2 = <EXECCOMMAND>
-	 {
-		 params = splitArgs(unquote(t2.image));
-	 }
-	]
-	{
-		loNative = new LONative(lp, new OperatorKey(scope, getNextId()),
-		        nativeMRJar, params);
-	
-		lp.add(loNative);
-		lp.connect(loStore, loNative);
-		lp.connect(loNative, loLoad);
-		return loLoad;
-	}
-}
-
-String FileName(): 
-{
-	Token t;
-}
-{
-	t = <QUOTEDSTRING> 
-	{log.debug("FileName: " + unquote(t.image)); return unquote(t.image);}
-}
-
-LogicalOperator FilterClause(LogicalPlan lp):
-{
-	ExpressionOperator cond; LogicalOperator input; 
-	LogicalPlan conditionPlan = new LogicalPlan();
-	log.trace("Entering FilterClause");
-}
-{
-	(
-	input = NestedExpr(lp) {log.debug("Filter input: " + input);}	
-	 <BY> cond = PCond(input.getSchema(),null,conditionPlan,input)
-	 )
-	{
-		//LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, input);
-		LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan);
-		addAlias(input.getAlias(), input);
-		lp.add(filter);
-		log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
-		
-		lp.connect(input, filter);
-		log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
-
-		log.trace("Exiting FilterClause");
-		return filter;
-	}
-}
-
-/*
- * "SAMPLE a x" is translated to "FILTER a BY RANDOM()<x"
- */
-LogicalOperator SampleClause(LogicalPlan lp) :
-{
-    ExpressionOperator cond;
-    LogicalOperator input;
-    Token t;
-    LogicalPlan conditionPlan = new LogicalPlan();
-    log.trace("Entering SampleClause");
-}
-{
-    (
-    input = NestedExpr(lp) {log.debug("Filter input: " + input);}
-    t = <DOUBLENUMBER>
-    )
-    {
-        LOUserFunc rand = new LOUserFunc(conditionPlan, new OperatorKey(scope, getNextId()), new FuncSpec(RANDOM.class.getName()), DataType.DOUBLE);
-        conditionPlan.add(rand);
-
-        double l = Double.parseDouble(t.image);
-        LOConst prob = new LOConst(conditionPlan, new OperatorKey(scope, getNextId()), l);
-        conditionPlan.add(prob);
-
-        cond = new LOLesserThanEqual(conditionPlan, new OperatorKey(scope, getNextId()));
-        conditionPlan.add(cond);
-        conditionPlan.connect(rand, cond);
-        conditionPlan.connect(prob, cond);
-
-        LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan);
-        addAlias(input.getAlias(), input);
-        lp.add(filter);
-        log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
-
-        lp.connect(input, filter);
-        log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
-
-        log.trace("Exiting SampleClause");
-        return filter;
-    }
-}
-
-LogicalOperator LimitClause(LogicalPlan lp):
-{
-        LogicalOperator input;
-        Token t;
-        long l;
-        log.trace("Entering LimitClause");
-}
-{
-        (
-        input = NestedExpr(lp) {log.debug("Limit input: " + input);}
-                (
-                   t = <INTEGER>     { l = Long.parseLong(t.image); }
-          | t = <LONGINTEGER> { l = Long.parseLong(t.image.substring(0, t.image.length() - 1)); }
-                )
-        )
-        {
-                LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, getNextId()), l);
-                addAlias(input.getAlias(), input);
-                lp.add(limit);
-                log.debug("Added operator " + limit.getClass().getName() + " to the logical plan");
-
-                lp.connect(input, limit);
-                log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + limit.getClass().getName() +" in the logical plan");
-
-                log.trace("Exiting LimitClause");
-                return limit;
-        }
-}
-
-ExpressionOperator PCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
-{
-	ExpressionOperator cond = null; 
-	log.trace("Entering PCond"); 
-	log.debug("PCond Input: " + input);
-}
-{
-	cond = POrCond(over,specs,lp, input)
-	{log.trace("Exiting PCond"); return cond;}
-}
-
-ExpressionOperator POrCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
-{
-	ExpressionOperator lhsCond, rhsCond; 
-	log.trace("Entering POrCond"); 
-	log.debug("POrCond Input: " + input);
-}
-{
-	(
-	lhsCond = PAndCond(over,specs,lp,input)
-	(
-		<OR> rhsCond = PAndCond(over,specs,lp,input)
-		{
-			ExpressionOperator exprOp = new LOOr(lp, new OperatorKey(scope, getNextId()) );
-			lp.add(exprOp);
-			log.debug("POrCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
-			lp.connect(lhsCond, exprOp);
-			log.debug("POrCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
-			lp.connect(rhsCond, exprOp);
-			log.debug("POrCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
-			lhsCond = exprOp;
-		}
-	)* 
-	)
-	{
-			log.trace("Exiting POrCond");
-			return lhsCond;
-	}
-}
-	
-ExpressionOperator PAndCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
-{
-	ExpressionOperator lhsCond, rhsCond; 
-	log.trace("Entering PAndCond"); 
-	log.debug("PAndCond Input: " + input);
-}
-{
-	(
-	lhsCond = PUnaryCond(over,specs,lp,input) 
-	(
-		<AND> rhsCond = PUnaryCond(over,specs,lp,input)
-		{
-			ExpressionOperator exprOp = new LOAnd(lp, new OperatorKey(scope, getNextId()) );
-			lp.add(exprOp);
-			log.debug("PAndCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
-			lp.connect(lhsCond, exprOp);
-			log.debug("PAndCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
-			lp.connect(rhsCond, exprOp);
-			log.debug("PAndCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
-			lhsCond = exprOp;
-		}
-	)*
-	)
-	{
-			log.trace("Exiting PAndCond");
-			return lhsCond;
-	}	
-}
-
-ExpressionOperator PUnaryCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) : 
-{
-	ExpressionOperator cond = null; 
-	ExpressionOperator lhs, rhs; 
-	Token t1; 
-	List<ExpressionOperator> args;
-    EvalFunc evalFunc = null;
-	log.trace("Entering PUnaryCond");
-}
-{
-	(
-	LOOKAHEAD("(" PCond(over,specs,lp,input) ")")
-	("(" cond = PCond(over,specs,lp,input) ")")
-|	LOOKAHEAD(InfixExpr(over,specs,lp,input) <FILTEROP>) 
-	(lhs=InfixExpr(over,specs,lp,input) t1=<FILTEROP> rhs=InfixExpr(over,specs,lp,input) 
-	{
-		//the long switch case to instantiate the right operator
-		//I have the long switch case from CompCond
-		String op = t1.image;
-		op = op.toLowerCase();
-		
-		char op1 = op.charAt(0);
-        char op2 = op.length() >= 2 ? op.charAt(1) : '0';
-        char op3 = op.length() == 3 ? op.charAt(2) : '0';
-        
-        switch (op1) {
-            // numeric ops first
-        case '=':
-            if (op2 == '=') {
-                cond = new LOEqual(lp, new OperatorKey(scope, getNextId()) );
-            } else {
-                throw new ParseException("Internal error: Invalid filter operator: " + op);
-            }
-            break;
-        case '<':
-            if (op2 == '=') {
-                cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()));
-            }
-            break;
-        case '>':
-            if (op2 == '=') {
-                cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()));
-            }
-            break;
-        case '!':
-            if (op2 == '=') {
-                cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                throw new ParseException("Internal error: Invalid filter operator: " + op);
-            }
-            break;
-            // now string ops
-        case 'e':
-            if (op2 == 'q') {
-                cond = new LOEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                throw new ParseException("Internal error: Invalid filter operator: " + op);
-            }
-            break;
-        case 'l':
-            if (op2 == 't' && op3 == 'e') {
-                cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()));
-            }
-            break;
-        case 'g':
-            if (op2 == 't' && op3 == 'e') {
-                cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()));
-            }
-            break;
-        case 'n':
-            if (op2 == 'e' && op3 == 'q') {
-                cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()));
-            } else {
-                throw new ParseException("Internal error: Invalid filter operator: " + op);
-            }
-            break;
-        default:
-            throw new ParseException("Internal error: Invalid filter operator: " + op);
-        }
-        
-        lp.add(cond);
-		log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
-		lp.connect(lhs, cond);
-		log.debug("PUnaryCond: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + cond + " logical plan " + lp);
-		lp.connect(rhs, cond);
-		log.debug("PUnaryCond: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + cond + " logical plan " + lp);
-	}
-	)
-|	LOOKAHEAD(InfixExpr(over,specs,lp,input) <MATCHES>) 
-		(lhs=InfixExpr(over,specs,lp,input) <MATCHES> t1=<QUOTEDSTRING> 
-			{
-                LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), unquote(t1.image));
-                rconst.setType(DataType.CHARARRAY);
-				cond = new LORegexp(lp, new OperatorKey(scope, getNextId())); 
-				lp.add(rconst); 
-				lp.add(cond); 
-				log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
-				lp.connect(lhs, cond);
-				lp.connect(rconst, cond);
-				log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
-			}
-		)
-|	LOOKAHEAD(EvalFuncSpec(over, specs, lp, input)) cond = EvalFuncSpec(over,specs,lp, input, FunctionType.EVALFUNC)
-|	cond = PNullCond(over,specs,lp,input)
-|	cond = PNotCond(over,specs,lp,input)
-
-	)
-	{log.trace("Exiting PUnaryCond"); return cond;}
-}
-
-ExpressionOperator PNotCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) : 
-{
-	ExpressionOperator c1;
-	log.trace("Entering PNotCond");
-}
-{
-	<NOT> c1=PUnaryCond(over,specs,lp,input)
-	{
-		ExpressionOperator eOp = new LONot(lp, new OperatorKey(scope, getNextId()));
-		lp.add(eOp);
-		log.debug("PNotCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
-		lp.connect(c1, eOp);
-		log.debug("PNotCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
-		log.trace("Exiting PNotCond");
-		return eOp;
-	}
-}
-
-ExpressionOperator PNullCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) : 
-{
-	ExpressionOperator c1;
-    boolean not = false;
-	log.trace("Entering PNullCond");
-}
-{
-	c1=InfixExpr(over,specs,lp,input) <IS> [<NOT> {not = true;}] <NULL>
-	{
-		ExpressionOperator eOp = new LOIsNull(lp, new OperatorKey(scope, getNextId()));
-		lp.add(eOp);
-		log.debug("PNullCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
-		lp.connect(c1, eOp);
-		log.debug("PNullCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
-        ExpressionOperator notNull = null;
-        if (not) {
-            notNull = new LONot(lp, new OperatorKey(scope, getNextId())); 
-            lp.add(notNull);
-		    log.debug("PNullCond: Added operator " + notNull.getClass().getName() + " " + notNull + " to logical plan " + lp);
-            lp.connect(eOp, notNull);
-		    log.debug("PNullCond: Connected operator " + notNull.getClass().getName() + " " + notNull + " to " + eOp + " logical plan " + lp);
-            eOp = notNull;
-        }
-		log.trace("Exiting PNullCond");
-		return eOp;
-	}
-}
-
-LogicalOperator CogroupClause(LogicalPlan lp) : 
-{
-    CogroupInput gi; 
-    ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
-    LogicalOperator cogroup = null; 
-    log.trace("Entering CoGroupClause");
-    Token t;
-    String partitioner = null;
-}
-{
-    (gi = GroupItem(lp) { gis.add(gi); }
-        ("," gi = GroupItem(lp) { gis.add(gi); })*
-        ([ <USING> (
-          (t = < QUOTEDSTRING> { cogroup = parseUsingForGroupBy(unquote (t.image), gis, lp); })
-         |("\"collected\"") {
-            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
-            cogroup = parseUsingForGroupBy("collected", gis, lp);
-            }
-         |("\"regular\"") {
-            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
-            cogroup = parseUsingForGroupBy("regular", gis, lp);
-            }
-         |("\"merge\"") {
-            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
-            cogroup = parseUsingForGroupBy("merge", gis, lp);
-            }
-        )])
-        ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
-    )
-    {
-        if (cogroup != null) {
-        	cogroup.setCustomPartitioner(partitioner);
-            log.trace("Exiting CoGroupClause");
-            return cogroup;
-        }
-        cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
-        if(cogroup != null) {
-        	cogroup.setCustomPartitioner(partitioner);
-        }
-        log.trace("Exiting CoGroupClause");
-        return cogroup;		
-    }
-
-}
-
-CogroupInput JoinItem(LogicalPlan lp) :
-{
-    LogicalOperator cgOp; 
-    boolean isInner = true;
-    ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); 
-    LogicalPlan groupByPlan;
-    ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-    ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
-    log.trace("Entering JoinItem");
-    log.debug("LogicalPlan: " + lp);
-}
-{
-    (
-        cgOp = NestedExpr(lp)
-        (
-            ( <BY> 
-                ( 
-                    LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
-                    ( "(" FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-                        {listPlans.add(groupByPlan);}
-                        (
-                            "," FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-                            {listPlans.add(groupByPlan);}
-                        )*
-                        ")" 
-                    )
-                |   (
-                        FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-                        {listPlans.add(groupByPlan);}
-                    )
-                )
-            )   
-        )        
-    )
-    {
-        CogroupInput cogroupInput = new CogroupInput(); 
-
-        cogroupInput.plans = listPlans;
-        cogroupInput.op = cgOp;
-        cogroupInput.isInner = isInner;
-        
-        log.trace("Exiting GroupItem");     
-        return cogroupInput;
-    }
-}
-
-
-CogroupInput GroupItem(LogicalPlan lp) : 
-{
-	ExpressionOperator es; 
-	LogicalOperator cgOp; 
-	boolean isInner = false; 
-	ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); 
-	LogicalPlan groupByPlan;
-	ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-	ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
-	log.trace("Entering GroupItem");
-	log.debug("LogicalPlan: " + lp);
-}
-{
-	(
-		cgOp = NestedExpr(lp)
-		(
-			( <BY> 
-				( 
-					LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
-					( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-						{listPlans.add(groupByPlan);}
-						(
-							"," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-							{listPlans.add(groupByPlan);}
-						)*
-						")" 
-					)
-				|	(
-						es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) 
-						{listPlans.add(groupByPlan);}
-					)
-				)
-			)	
-		|	<ALL> {
-					es = new LOConst(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), "all"); 
-                    es.setType(DataType.CHARARRAY);
-					groupByPlan.add(es);
-					log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
-					listPlans.add(groupByPlan);
-			}
-		|	<ANY> {
-					es = new LOUserFunc(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), new FuncSpec(GFAny.class.getName()), DataType.INTEGER); 
-					groupByPlan.add(es);
-					log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
-					listPlans.add(groupByPlan);
-			}
-		)
-		[<INNER> {isInner = true;} | <OUTER>]
-	)
-	{
-		CogroupInput cogroupInput = new CogroupInput(); 
-
-		cogroupInput.plans = listPlans;
-		cogroupInput.op = cgOp;
-		cogroupInput.isInner = isInner;
-		
-		log.trace("Exiting GroupItem");		
-		return cogroupInput;
-    }
-}
-
-LogicalOperator OrderClause(LogicalPlan lp) : 
-{
-	LogicalOperator op; 
-	ExpressionOperator col; 
-	boolean star = false; 
-	ArrayList<ExpressionOperator> sortCols = new ArrayList<ExpressionOperator>(); 
-	ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>(); 
-	ArrayList<Boolean> ascOrder = new ArrayList<Boolean>(); 
-	boolean asc = true; 
-	String funcName = null; 
-	Token t1;
-    FuncSpec funcSpec = null;
-	log.trace("Entering OrderClause");
-}
-{
-	(
-	op = NestedExpr(lp) <BY> 
-	(
-	    ( 
-		(
-		col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans) 
-		("," col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans))*		
-		)
-	)
-	|	<STAR> {star = true;} [<ASC> | <DESC> {asc = false;}] 
-		{
-            LogicalPlan sortColPlan = new LogicalPlan();
-		    LOProject projectStar = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, -1);
-			projectStar.setStar(true);
-            sortColPlan.add(projectStar);
-            sortColPlans.add(sortColPlan);
-			log.debug("Set star to true");
-			if(asc) {
-				ascOrder.add(true);
-			} else {	
-				ascOrder.add(false);
-			}
-		}	
-	)
-	(
-	    <USING>  funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
-    )?
-
-	)
-	{
-		LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder, 
-		                          funcSpec );
-		sort.setStar(star);
-		sort.setLimit(-1);
-		lp.add(sort);
-		log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
-		
-		lp.connect(op, sort);
-		log.debug("Connecting sort input alias " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + sort.getClass().getName() + " in the logical plan");
-		
-		log.trace("Exiting OrderClause");
-		return sort;		
-	}
-}
-
-
-ExpressionOperator SortCol(Schema over, LogicalPlan lp, LogicalOperator op, ArrayList<Boolean> ascOrder, ArrayList<LogicalPlan> sortColPlans) : 
-{
-	ExpressionOperator col; 
-    int colNum;
-	boolean asc = true; 
-	LogicalPlan sortColPlan = new LogicalPlan(); 
-	log.trace("Entering SortCol");}
-{
-	(
-		//col = ColOrSpec(op.getSchema(), null, sortColPlan, op) [<ASC> | <DESC> {asc = false;}]
-		colNum = ColNameOrNum(op.getSchema()) [<ASC> | <DESC> {asc = false;}]
-		{
-			if(asc) {
-				log.debug("Ascending");
-				ascOrder.add(true);
-			} else {
-				log.debug("Descending");	
-				ascOrder.add(false);
-			}
-            col = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, colNum);
-            sortColPlan.add(col);
-			sortColPlans.add(sortColPlan);
-		}
-		|
-		( 
-			//"(" col = ColOrSpec(op.getSchema(), null, sortColPlan, op) ")" [<ASC> | <DESC> {asc = false;}]
-			"(" colNum = ColNameOrNum(op.getSchema()) ")" [<ASC> | <DESC> {asc = false;}]
-			{
-				if(asc) {
-					log.debug("Ascending");
-					ascOrder.add(true);
-				} else {
-					log.debug("Descending");	
-					ascOrder.add(false);
-				}
-                col = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, colNum);
-                sortColPlan.add(col);
-				sortColPlans.add(sortColPlan);
-			}
-		)
-	)
-	{
-		log.trace("Exiting SortCol");
-		return col;
-	}	
-}
-
-int ColNameOrNum(Schema over) : 
-{
-	Token t; 
-	log.trace("Entering ColNameOrNum");
-}
-{
-	(
-	t = <DOLLARVAR> {return undollar(t.image);}
-	|
-	t = <IDENTIFIER> 
-	{	int i;
-		
-        try {
-		    if ( over == null ||  (i = over.getPosition(t.image)) == -1) {
-			    throw new ParseException("Invalid alias: " + t.image + " in " + over);
-		    } 
-        } catch (FrontendException fee) {
-        	ParseException pe = new ParseException(fee.getMessage());
-        	pe.initCause(fee);
-            throw pe;
-        }
-		
-		log.trace("Exiting ColNameOrNum");
-		return i;
-	}
-	| 
-	t = <GROUP> 
-	{	
-        try {
-		    if ( over == null ||  (i = over.getPosition(t.image)) == -1) {
-			    throw new ParseException("Invalid alias: " + t.image + " in " + over);
-		    } 
-        } catch (FrontendException fee) {
-        	ParseException pe = new ParseException(fee.getMessage());
-        	pe.initCause(fee);
-            throw pe;
-        }
-		
-		log.trace("Exiting ColNameOrNum");
-		return i;
-	}
-	)
-}		
-
-LogicalOperator CrossClause(LogicalPlan lp) : 
-{
-	LogicalOperator op; 
-	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
-	String partitioner = null; 
-	log.trace("Entering CrossClause");
-}
-{
-	(
-	op = NestedExpr(lp) { inputs.add(op); }
-	("," op = NestedExpr(lp) { inputs.add(op); })+
-	)
-	([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
-	{
-		LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()));
-		lp.add(cross);
-		cross.setCustomPartitioner(partitioner);
-		log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
-		
-		for (LogicalOperator lop: inputs) {
-				lp.connect(lop, cross);	
-				log.debug("Connected operator " + lop.getClass().getName() + " " + lop + " to " + cross + " logical plan " + lp);
-		}
-		log.debug("Connected cross inputs to the cross operator");
-		
-		log.trace("Exiting CrossClause");
-		return cross;
-	}
-}
-
-LogicalOperator JoinClause(LogicalPlan lp) : 
-{
-	CogroupInput gi; 
-	ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
-	log.trace("Entering JoinClause");
-	log.debug("LogicalPlan: " + lp);
-	LogicalOperator joinOp = null;
-	boolean isLeftOuter = false;
-	boolean isRightOuter = false;
-	boolean isFullOuter = false;
-	boolean isOuter = false;
-	Token t;
-	String partitioner = null;
-}
-{
-	(gi = JoinItem(lp) { gis.add(gi); }
-	[
-	    (<LEFT> [<OUTER>] { isLeftOuter = true;})
-        |
-        (<RIGHT> [<OUTER>] {isRightOuter = true;})
-        |
-        (<FULL> [<OUTER>] {isFullOuter = true;})
-	]
-	("," gi = JoinItem(lp) { gis.add(gi); })+
-	{
-		// in the case of outer joins, only two
-		// inputs are allowed
-		isOuter = (isLeftOuter || isRightOuter || isFullOuter);
-		if(isOuter && gis.size() > 2) {
-		  throw new ParseException("(left|right|full) outer joins are only supported for two inputs");  
-		}
-		
-		// we have exactly two inputs
-		
-		// the semantics of "outer"
-        // for join are different from cogroup
-        // cogroup a by $0 inner, b by $0 outer means keep
-        // all keys from a and for cases where there is no match
-        // from b have an empty bag for b. For keys in b which
-        // do not match in a, there will be no output records.
-        // Whereas with join,
-        // join a by $0 inner, b by $0 outer implies right outer
-        // join which has the exact opposite semantics - for
-        // all keys in b which do not have a match in b we need to
-        // output null for fields in a. For keys in a which do not
-        // match in b, no record should be output. Since we will be
-        // using the same underlying implementation for outer join
-        // as cogroup we should achieve join semantics by setting the
-        // isinner flag accordingly
-        if (isLeftOuter) {
-            gis.get(1).isInner = false;
-        } else if (isRightOuter) {
-            gis.get(0).isInner = false;
-        } else if (isFullOuter) {
-            gis.get(0).isInner = false;
-            gis.get(1).isInner = false;
-        }
-		
-	}
-	// For all types of join we create LOJoin and mark what type of join it is.
-	([<USING> (
-		  (t = <QUOTEDSTRING> { joinOp = parseUsingForJoin(unquote(t.image), gis, lp, isFullOuter, isRightOuter, isOuter);})
-        | ("\"repl\"" | "\"replicated\"")  {
-		      log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
-              joinOp = parseUsingForJoin("replicated", gis, lp, isFullOuter, isRightOuter, isOuter);
-		  }
-	    | ("\"skewed\"") {
-              log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
-              joinOp = parseUsingForJoin("skewed", gis, lp, isFullOuter, isRightOuter, isOuter);
-		    	}
-        | ("\"merge\"") { 
-            log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
-            joinOp = parseUsingForJoin("merge", gis, lp, isFullOuter, isRightOuter, isOuter);
-        	}
-	    | ("\"hash\"" | "\"default\"") {
-		    log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
-            joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, isRightOuter, isOuter);
-		    	}
-     )]))
-     ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
-	
-	{
-		log.trace("Exiting JoinClause");
-		if (joinOp == null) {
-			joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-		}
-		if(partitioner != null) {
-			if(((LOJoin)joinOp).getJoinType() == LOJoin.JOINTYPE.SKEWED) {
-				throw new ParseException("Custom Partitioner is not supported for skewed join");
-			}
-			else {
-				joinOp.setCustomPartitioner(partitioner);
-			}
-		}
-		return joinOp;
-	}
-}
-
-LogicalOperator UnionClause(LogicalPlan lp) : 
-{
-	LogicalOperator op;
-	boolean isOnSchema = false;
-	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
-	log.trace("Entering UnionClause");
-}
-{
-    [<ONSCHEMA> {isOnSchema = true;}]
-	(op = NestedExpr(lp){inputs.add(op);}
-	("," op = NestedExpr(lp) {inputs.add(op);})+)
-	{
-            try{// this try-catch block will catch all exceptions and convert them
-                // to ParseException. Otherwise, if any exception than ParseException
-                // is thrown , the generated parse code tries to cast
-                //the exception to Error, resulting in a misleading error message
-                LOUnion union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
-                union.setOnSchema(isOnSchema);
-                lp.add(union);
-
-                log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
-
-                for (LogicalOperator lop: inputs) {
-                    lp.connect(lop, union);
-                    log.debug("Connected union input operator " +
-                            lop.getClass().getName() + " to operator " +
-                            lop.getClass().getName() + " in the logical plan");
-                }		
-
-                log.trace("Exiting UnionClause");
-                return union;
-            }
-            catch(Exception e){
-                ParseException pe = new ParseException();
-                pe.initCause(e);
-                throw pe;           
-            }
-	}
-
-}
-
-    LogicalOperator ForEachClause(LogicalPlan lp) : 
-{
-	ArrayList<LogicalOperator> specList = new ArrayList<LogicalOperator>(); 
-	LogicalOperator input, foreach; 
-	LogicalPlan foreachPlan = new LogicalPlan();
-    ArrayList<LogicalPlan> foreachPlans = new ArrayList<LogicalPlan>();
-	log.trace("Entering ForEachClause");
-}
-{
-	(
-	input = NestedExpr(lp)
-	specList = NestedBlock(input.getSchema(), specList, foreachPlan, input)
-	)
-	{
-        LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1);
-        List<LogicalPlan> generatePlans = generate.getGeneratePlans();
-        List<Boolean> flattenList = generate.getFlatten();
-        List<Schema> userDefinedSchemaList = generate.getUserDefinedSchema();
-        /*
-        Generate's nested plans will be translated to foreach's nested plan
-        If generate contains an expression that does not require generate's
-        inputs then it should be made part of foreach without the generate
-        For the remaining expressions, the entire DAG till generate has to be
-        duplicated and then the nested plan attached to a new generate
-        */
-
-        for (int planCtr = 0; planCtr < generatePlans.size(); ++planCtr) {
-            LogicalPlan generatePlan = generatePlans.get(planCtr);
-            List<LogicalOperator> planRoots = new ArrayList<LogicalOperator>(generatePlan.getRoots());
-            boolean needGenerateInput = false;
-            boolean needForEachInput = false;
-            MultiMap<LogicalOperator, LogicalOperator> mapProjectInputs = null;
-            Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
-            for(LogicalOperator root: planRoots) {
-                if(root instanceof ExpressionOperator && !(root instanceof LOProject)) {
-                    if(checkGenerateInput(root)) {
-                        needGenerateInput = true;
-                        attachPlan(generatePlan, root, foreachPlan, rootProcessed);
-                        rootProcessed.put(root, true);
-                    }
-                }
-            }
-            
-            planRoots = generatePlan.getRoots();
-            needGenerateInput = false;
-            needForEachInput = false;
-
-            for(LogicalOperator root: planRoots) {
-                if(root instanceof LOProject) {
-                    LOProject project = (LOProject)root;
-                    LogicalOperator projectInput = project.getExpression();
-                    if(checkGenerateInput(projectInput) || !(projectInput.equals(input))) {
-                        needGenerateInput = true;
-                        if(null == mapProjectInputs) {
-                            mapProjectInputs = new MultiMap<LogicalOperator, LogicalOperator>();
-                        }
-                        mapProjectInputs.put(root, projectInput);
-                    } else {
-                        needForEachInput = true;
-                    }
-                } 
-            }
-            if(needGenerateInput) {
-                /*
-                Duplicate the logical plan until the generate but excluding generate
-                Create a new generate operator with the plan being iterated on
-                Attach the generate as the leaf and add the duplicated plan to
-                the list of foreach plans
-                */
-
-                for(LogicalOperator project: mapProjectInputs.keySet()) {
-                    for(LogicalOperator projectInput: mapProjectInputs.get(project)) {
-                        generatePlan.add(projectInput);
-                        generatePlan.connect(projectInput, project);
-                        attachPlan(generatePlan, projectInput, foreachPlan, rootProcessed);
-                        rootProcessed.put(projectInput, true);
-                    }
-                }
-            }
-            LogicalPlanCloner lpCloner = new LogicalPlanCloner(generatePlan);
-            LogicalPlan generatePlanClone;
-            try {
-                generatePlanClone = lpCloner.getClonedPlan();
-            } catch (CloneNotSupportedException cnse) {
-                ParseException pe = new ParseException("Not able to clone foreach plan");
-                pe.initCause(cnse);
-                throw pe;
-            }
-            RemoveRedundantOperators removeOperators = new RemoveRedundantOperators(generatePlanClone);
-            try {
-                removeOperators.visit();
-            } catch (VisitorException ve) {
-            	ParseException pe = new ParseException("Could not remove redundant operators in foreach plan.");
-                pe.initCause(ve);
-                throw pe;
-            }
-            foreachPlans.add(generatePlanClone);
-        }
-
-		resetGenerateState();
-		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList, (ArrayList) userDefinedSchemaList);
-		try {
-			lp.add(foreach);
-			log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
-		
-			lp.connect(input, foreach);
-			log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " object " + input + " to operator " + foreach.getClass().getName() + " in the logical plan");
-		} catch (PlanException planException) {
-			ParseException pe = new ParseException(planException.getMessage());
-			pe.initCause(planException);
-			throw pe;
-		}
-		
-		log.trace("Exiting ForEachClause");
-		return foreach;
-	}
-}
-
-LogicalOperator StreamClause(LogicalPlan lp): 
-{
-	LogicalOperator input; 
-	StreamingCommand command;
-}
-{
-	input = NestedExpr(lp)	
-	
-	<THROUGH> command = Command()
-	{
-		LOStream loStream = new LOStream(lp, new OperatorKey(scope, getNextId()), input, 
-                    pigContext.createExecutableManager(), command);
-        //addAlias(input.getAlias(), input);
-        lp.add(loStream);
-        lp.connect(input, loStream);
-        return loStream;
-	}
-}
-
-StreamingCommand Command(): {Token t; StreamingCommand command;}
-{
-	t = <EXECCOMMAND>
-	{
-		String[] argv = splitArgs(unquote(t.image));
-		command = new StreamingCommand(pigContext, argv);
-        checkAutoShipSpecs(command, argv);
-		return command;
-	}
-	|
-	t = <IDENTIFIER>
-	{
-		command = pigContext.getCommandForAlias(t.image);
-		if (command == null) {
-			throw new ParseException("Undefined command-alias: " + t.image + 
-			                         " used as stream operator");
-		}
-
-		return command;
-	}
-}
-
-LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token cmd; String functionName, functionArgs;}
-{
-    t = <IDENTIFIER>
-    (
-    ( 
-        cmd = <EXECCOMMAND>
-        {
-            StreamingCommand command = 
-               new StreamingCommand(pigContext, splitArgs(unquote(cmd.image)));
-            String[] paths;
-            StreamingCommand.HandleSpec[] handleSpecs;
-        }
-        (
-            <SHIP> "(" paths = PathList() ")" 
-            {
-                if (paths.length == 0) {
-                	command.setShipFiles(false);
-                } else {
-                    for (String path : paths) {
-                    	try {
-                            command.addPathToShip(path);
-                        } catch(IOException e) {
-                        	ParseException pe = new ParseException(e.getMessage());
-                        	pe.initCause(e); 
-                            throw pe;
-                        }
-                    }
-                }
-            }
-            |
-            <CACHE> "(" paths = PathList() ")"
-            {
-                for (String path : paths) {
-                    try {
-                        command.addPathToCache(path);
-                    } catch(IOException e) {
-                    	ParseException pe = new ParseException(e.getMessage());
-                    	pe.initCause(e); 
-                        throw pe;
-                    }
-                }
-            }
-            |
-            <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
-            |

[... 1717 lines stripped ...]