You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/01/04 23:33:16 UTC

svn commit: r1055216 - in /pig/trunk: src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/parser/ test/org/apache/pig/parser/

Author: thejas
Date: Tue Jan  4 22:33:16 2011
New Revision: 1055216

URL: http://svn.apache.org/viewvc?rev=1055216&view=rev
Log:
PIG-1618: Switch to new parser generator technology - NewParser-10.patch - (xuefuz via thejas)

Modified:
    pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
    pig/trunk/test/org/apache/pig/parser/TestParser.pig

Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Tue Jan  4 22:33:16 2011
@@ -130,8 +130,7 @@ public class LogicalPlanMigrationVistor 
         
         org.apache.pig.newplan.logical.relational.LOCogroup newCogroup =
             new org.apache.pig.newplan.logical.relational.LOCogroup
-            (logicalPlan, newExpressionPlans, grouptype, cg.getInner(), 
-                    cg.getRequestedParallelism() );
+            (logicalPlan, newExpressionPlans, grouptype, cg.getInner());
         
         for( int i = 0; i < inputs.size(); i++ ) {
             ArrayList<LogicalPlan> plans = 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Tue Jan  4 22:33:16 2011
@@ -76,11 +76,11 @@ public class LOCogroup extends LogicalRe
         
     public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
     expressionPlans, boolean[] isInner ) {
-        this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner, -1 );
+        this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner );
     }
 
     public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
-    expressionPlans, GROUPTYPE groupType, boolean[] isInner, int requestedParrellism) {
+    expressionPlans, GROUPTYPE groupType, boolean[] isInner) {
         super("LOCogroup", plan);
         this.mExpressionPlans = expressionPlans;
         if( isInner != null ) {

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Tue Jan  4 22:33:16 2011
@@ -84,10 +84,18 @@ private Set<String> aliases = new HashSe
 query : ^( QUERY statement* )
 ;
 
-statement : general_statement | foreach_statement
+statement : general_statement
+          | foreach_statement
+          | split_statement
 ;
 
-general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause INTEGER? )
+split_statement : split_clause
+;
+
+general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause parallel_clause? )
+;
+
+parallel_clause : ^( PARALLEL INTEGER )
 ;
 
 // We need to handle foreach specifically because of the ending ';', which is not required 
@@ -130,10 +138,7 @@ path_list : QUOTEDSTRING+
 cache_caluse : ^( CACHE path_list )
 ;
 
-input_clause : ^( INPUT stream_cmd_list )
-;
-
-stream_cmd_list : stream_cmd+
+input_clause : ^( INPUT stream_cmd+ )
 ;
 
 stream_cmd : ^( STDIN func_clause? )
@@ -141,13 +146,10 @@ stream_cmd : ^( STDIN func_clause? )
            | ^( QUOTEDSTRING func_clause? )
 ;
 
-output_clause : ^( OUTPUT stream_cmd_list )
+output_clause : ^( OUTPUT stream_cmd+ )
 ;
 
-error_clause : ^( ERROR error_cmd? )
-;
-
-error_cmd : ^( QUOTEDSTRING INTEGER? )
+error_clause : ^( ERROR  QUOTEDSTRING INTEGER? )
 ;
 
 load_clause : ^( LOAD filename func_clause? as_clause? )
@@ -194,7 +196,7 @@ map_type : MAP_TYPE
 ;
 
 func_clause : ^( FUNC func_name func_args? )
-            | ^( FUNC func_alias )
+            | ^( FUNC_REF func_alias )
 ;
 
 func_name : eid ( ( PERIOD | DOLLAR ) eid )*
@@ -206,11 +208,11 @@ func_alias : IDENTIFIER
 func_args : QUOTEDSTRING+
 ;
 
-group_clause : ^( GROUP group_item_list QUOTEDSTRING? )
-             | ^( COGROUP group_item_list QUOTEDSTRING? )
+group_clause : ^( GROUP group_item+ group_type? )
+             | ^( COGROUP group_item+ group_type? )
 ;
 
-group_item_list : group_item+
+group_type : HINT_COLLECTED | HINT_MERGE | HINT_REGULAR
 ;
 
 group_item : rel ( join_group_by_clause | ALL | ANY ) ( INNER | OUTER )?
@@ -220,7 +222,8 @@ rel : alias {  validateAliasRef( aliases
     | op_clause
 ;
 
-flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
+flatten_generated_item : flatten_clause as_clause?
+                       | ( expr | STAR ) field[new HashSet<String>()]?
 ;
 
 flatten_clause : ^( FLATTEN expr )
@@ -305,7 +308,7 @@ rel_list : rel+
 join_clause : ^( JOIN join_sub_clause join_type? partition_clause? )
 ;
 
-join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
+join_type : HINT_REPL | HINT_MERGE | HINT_SKEWED | HINT_DEFAULT
 ;
 
 join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
@@ -324,29 +327,26 @@ join_group_by_expr : expr | STAR
 union_clause : ^( UNION ONSCHEMA? rel_list )
 ;
 
-foreach_clause : ^( FOREACH rel nested_plan )
+foreach_clause : ^( FOREACH rel foreach_plan )
 ;
 
-nested_plan : ^( NESTED_PLAN foreach_blk )
-            | ^( NESTED_PLAN generate_clause )
+foreach_plan : ^( FOREACH_PLAN nested_blk )
+             | ^( FOREACH_PLAN generate_clause parallel_clause? )
 ;
 
-foreach_blk : nested_command_list generate_clause
+nested_blk
+scope { Set<String> ids; }
+@init{ $nested_blk::ids = new HashSet<String>(); }
+ : nested_command* generate_clause
 ;
 
 generate_clause : ^( GENERATE flatten_generated_item+ )
 ;
 
-nested_command_list
-scope { Set<String> ids; }
-@init{ $nested_command_list::ids = new HashSet<String>(); }
- : nested_command*
-;
-
 nested_command
  : ^( NESTED_CMD IDENTIFIER ( expr | nested_op ) )
    {
-       $nested_command_list::ids.add( $IDENTIFIER.text );
+       $nested_blk::ids.add( $IDENTIFIER.text );
    }
 ;
 
@@ -366,7 +366,7 @@ col_ref_list : col_ref+
 nested_alias_ref
  : IDENTIFIER
    {
-       validateAliasRef( $nested_command_list::ids, $IDENTIFIER.text );
+       validateAliasRef( $nested_blk::ids, $IDENTIFIER.text );
    }
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Tue Jan  4 22:33:16 2011
@@ -18,19 +18,32 @@
 
 package org.apache.pig.parser;
 
+import java.io.IOException;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
+import org.antlr.runtime.RecognitionException;
+import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.RANDOM;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.StringUtils;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
@@ -43,17 +56,22 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LODistinct;
 import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LONative;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
 import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
 import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 import org.apache.pig.newplan.Operator;
@@ -62,14 +80,26 @@ public class LogicalPlanBuilder {
     private LogicalPlan plan = new LogicalPlan();
 
     private Map<String, Operator> operators = new HashMap<String, Operator>();
-    private Map<String, FuncSpec> functions = new HashMap<String, FuncSpec>();
     
+    // TODO: hook up with the real PigContext instance instead of creating one here.
+    private PigContext pigContext = new PigContext( ExecType.LOCAL, new Properties() );
+    
+    private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+
     FuncSpec lookupFunction(String alias) {
-        return functions.get( alias );
+        return pigContext.getFuncSpecFromAlias( alias );
+    }
+    
+    StreamingCommand lookupCommand(String alias) {
+        return pigContext.getCommandForAlias( alias );
+    }
+    
+    void defineCommand(String alias, StreamingCommand command) {
+        pigContext.registerStreamCmd( alias, command );
     }
     
     void defineFunction(String alias, FuncSpec fs) {
-        functions.put( alias, fs );
+        pigContext.registerFunction( alias, fs );
     }
     
     LogicalPlan getPlan() {
@@ -106,9 +136,9 @@ public class LogicalPlanBuilder {
         return buildOp( op, alias, parallel, inputAliases, null );
     }
 
-    String buildSplitOp(String alias, Integer parallel, String inputAlias) {
+    String buildSplitOp(String inputAlias) {
         LOSplit op = new LOSplit( plan );
-        return buildOp( op, alias, parallel, inputAlias, null );
+        return buildOp( op, null, null, inputAlias, null );
     }
     
     String buildSplitOutputOp(String alias, Integer parallel, String inputAlias, LogicalExpressionPlan filterPlan) {
@@ -137,9 +167,14 @@ public class LogicalPlanBuilder {
         return buildOp( op, alias, parallel, inputAliases, partitioner );
     }
 
-    String buildGroupOp(String alias, Integer parallel, String inputAlias, List<LogicalExpressionPlan> plans) {
-        LOCogroup op = new LOCogroup( plan );
-        return buildOp( op, alias, parallel, inputAlias, null );
+    String buildGroupOp(String alias, Integer parallel, List<String> inputAliases, 
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans, GROUPTYPE gt, List<Boolean> innerFlags) {
+        boolean[] flags = new boolean[innerFlags.size()];
+        for( int i = 0; i < innerFlags.size(); i++ ) {
+            flags[i] = innerFlags.get( i );
+        }
+        LOCogroup op = new LOCogroup( plan, expressionPlans, gt, flags );
+        return buildOp( op, alias, parallel, inputAliases, null );
     }
     
     String buildLoadOp(String alias, Integer parallel, String filename, FuncSpec funcSpec, LogicalSchema schema) {
@@ -176,14 +211,104 @@ public class LogicalPlanBuilder {
         return buildOp( op, alias, parallel, inputAlias, null );
     }
     
-    static void setAlias(LogicalRelationalOperator op, String alias) {
-        if( alias != null ) {
-            op.setAlias( alias );
-        } else {
-            // TODO: generate an alias.
+    String buildForeachOp(String alias, Integer parallel, String inputAlias, LogicalPlan innerPlan) {
+        LOForEach op = new LOForEach( plan );
+        op.setInnerPlan( innerPlan );
+        return buildOp( op, alias, parallel, inputAlias, null );
+    }
+    
+    void buildGenerateOp(LogicalPlan plan, List<LogicalExpressionPlan> exprPlans, List<Boolean> flattenFlags,
+            List<LogicalSchema> schemas) {
+        boolean[] flags = new boolean[ flattenFlags.size() ];
+        for( int i = 0; i < flattenFlags.size(); i++ )
+            flags[i] = flattenFlags.get( i );
+        LOGenerate op = new LOGenerate( plan, exprPlans, flags );
+        op.setUserDefinedSchema( schemas );
+        plan.add( op );
+    }
+    
+    StreamingCommand buildCommand(String cmd, List<String> shipPaths, List<String> cachePaths,
+            List<HandleSpec> inputHandleSpecs, List<HandleSpec> outputHandleSpecs,
+            String logDir, Integer limit) throws RecognitionException {
+        StreamingCommand command = null;
+        try {
+            command = buildCommand( cmd );
+            
+            // Process ship paths
+            if( shipPaths.size() == 0 ) {
+                command.setShipFiles( false );
+            } else {
+                for( String path : shipPaths )
+                    command.addPathToShip( path );
+            }
+            
+            // Process cache paths
+            for( String path : cachePaths )
+                command.addPathToCache( path );
+            
+            // Process input handle specs
+            for( HandleSpec spec : inputHandleSpecs )
+                command.addHandleSpec( Handle.INPUT, spec );
+            
+            // Process output handle specs
+            for( HandleSpec spec : outputHandleSpecs )
+                command.addHandleSpec( Handle.OUTPUT, spec );
+            
+            // error handling
+            command.setLogDir( logDir );
+            if( limit != null )
+                command.setLogFilesLimit( limit );
+        } catch(IOException e) {
+        }
+        
+        return command;
+    }
+    
+    StreamingCommand buildCommand(String cmd) throws RecognitionException {
+        try {
+            return new StreamingCommand( pigContext, splitArgs( cmd ) );
+        } catch (ParseException e) {
+            throw new RecognitionException();        }
+    }
+    
+    String buildStreamOp(String alias, Integer parallel, String inputAlias, StreamingCommand command, LogicalSchema schema)
+    throws RecognitionException {
+        try {
+            LOStream op = new LOStream( plan, pigContext.createExecutableManager(), command, schema );
+            return buildOp( op, alias, parallel, inputAlias, null );
+        } catch (ExecException ex) {
+            throw new RecognitionException();
         }
     }
     
+    // TODO: create specific exceptions to throw.
+    String buildNativeOp(Integer parallel, String inputJar, String cmd,
+            List<String> paths, String storeAlias, String loadAlias)
+    throws RecognitionException {
+        LONative op;
+        try {
+            op = new LONative( plan, inputJar, splitArgs( cmd ) );
+            pigContext.addJar( inputJar );
+            for( String path : paths )
+                pigContext.addJar( path );
+           buildOp( op, null, parallel, new ArrayList<String>(), null );
+            plan.connect( operators.get( storeAlias ), op );
+            LOLoad load = (LOLoad)operators.get( loadAlias );
+            plan.connect( op, load );
+            return load.getAlias();
+        } catch (ParseException e) {
+            throw new RecognitionException();
+        } catch (MalformedURLException e) {
+            throw new RecognitionException();
+        }
+    }
+    
+    static void setAlias(LogicalRelationalOperator op, String alias) {
+        if( alias == null )
+            alias = new OperatorKey( "test", getNextId() ).toString();
+        op.setAlias( alias );
+    }
+    
     static void setParallel(LogicalRelationalOperator op, Integer parallel) {
         if( parallel != null )
             op.setRequestedParallelism( parallel );
@@ -213,11 +338,61 @@ public class LogicalPlanBuilder {
         return bagFactory.newDefaultBag();
     }
     
-    LogicalExpressionPlan buildProjectStar(String opAlias) {
-        LogicalExpressionPlan plan = new LogicalExpressionPlan();
+    LogicalExpression buildProjectStar(LogicalExpressionPlan plan, String opAlias) {
         LogicalRelationalOperator op = (LogicalRelationalOperator)operators.get( opAlias );
-        new ProjectExpression( plan, 0, -1, op );
-        return plan;
+        return new ProjectExpression( plan, 0, -1, op );
     }
     
+    LogicalExpression buildUDF(LogicalExpressionPlan plan, String funcName, List<LogicalExpression> args) {
+        FuncSpec funcSpec = null;// TODO: get funcspec from function name.
+        return new UserFuncExpression( plan, funcSpec, args );
+    }
+    
+    private static final char SINGLE_QUOTE = '\u005c'';
+    private static final char DOUBLE_QUOTE = '"';
+    private static String[] splitArgs(String command) throws ParseException {
+        List<String> argv = new ArrayList<String>();
+
+        int beginIndex = 0;
+
+        while (beginIndex < command.length()) {
+            // Skip spaces
+            while (Character.isWhitespace(command.charAt(beginIndex))) {
+                ++beginIndex;
+            }
+
+            char delim = ' ';
+            char charAtIndex = command.charAt(beginIndex);
+            if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+                delim = charAtIndex;
+            }
+
+            int endIndex = command.indexOf(delim, beginIndex+1);
+            if (endIndex == -1) {
+                if (Character.isWhitespace(delim)) {
+                    // Reached end of command-line
+                    argv.add(command.substring(beginIndex));
+                    break;
+                } else {
+                    // Didn't find the ending quote/double-quote
+                    throw new ParseException("Illegal command: " + command);
+                }
+            }
+
+            if (Character.isWhitespace(delim)) {
+                // Do not consume the space
+                argv.add(command.substring(beginIndex, endIndex));
+            } else {
+                argv.add(command.substring(beginIndex, endIndex+1));
+            }
+
+            beginIndex = endIndex + 1;
+        }
+
+        return argv.toArray(new String[argv.size()]);
+    }
+
+    private static long getNextId() {
+        return nodeIdGen.getNextNodeId( "test" );
+    }
 }

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Tue Jan  4 22:33:16 2011
@@ -37,6 +37,9 @@ package org.apache.pig.parser;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.builtin.GFAny;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.expression.AddExpression;
 import org.apache.pig.newplan.logical.expression.AndExpression;
@@ -60,12 +63,14 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.RegexExpression;
 import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
 import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.builtin.PigStreaming;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -84,23 +89,35 @@ public LogicalPlan getLogicalPlan() {
 query : ^( QUERY statement* )
 ;
 
-statement : general_statement | foreach_statement
+statement
+scope {
+   // Parsing context
+    String alias; // The alias of the current operator, either given or generated by the parser.
+    Integer parallel; // Parallelism
+    String inputAlias; // The alias of the input operator
+}
+ : general_statement
+ | foreach_statement
+ | split_statement
+;
+
+split_statement : split_clause
 ;
 
 general_statement 
-scope {
-    String alias;
-    Integer parallel;
-}
-@init {
-}
-: ^( STATEMENT ( alias { $general_statement::alias = $alias.name; } )? 
-  op_clause ( INTEGER { $general_statement::parallel = Integer.parseInt( $INTEGER.text ); } )? )
+: ^( STATEMENT ( alias { $statement::alias = $alias.name; } )? op_clause parallel_clause? )
+;
+
+parallel_clause
+ : ^( PARALLEL INTEGER )
+   {
+       $statement::parallel = Integer.parseInt( $INTEGER.text );
+   }
 ;
 
 // We need to handle foreach specifically because of the ending ';', which is not required 
 // if there is a nested block. This is ugly, but it gets the job done.
-foreach_statement : ^( STATEMENT alias? foreach_clause[$alias.text] )
+foreach_statement : ^( STATEMENT ( alias { $statement::alias = $alias.name; } )? foreach_clause )
 ;
 
 alias returns[String name]: IDENTIFIER { $name = $IDENTIFIER.text; }
@@ -119,56 +136,95 @@ op_clause returns[String alias] : 
           | cross_clause { $alias = $cross_clause.alias; }
           | join_clause { $alias = $join_clause.alias; }
           | union_clause { $alias = $union_clause.alias; }
-          | stream_clause
-          | mr_clause
-          | split_clause { $alias = $split_clause.alias; }
+          | stream_clause { $alias = $stream_clause.alias; }
+          | mr_clause { $alias = $mr_clause.alias; }
 ;
 
 define_clause 
- : ^( DEFINE alias cmd ) 
+ : ^( DEFINE alias cmd[$alias.name] ) 
+   {
+       builder.defineCommand( $alias.name, $cmd.command );
+   }
  | ^( DEFINE alias func_clause )
    {
        builder.defineFunction( $alias.name, $func_clause.funcSpec );
    }
 ;
 
-cmd : ^( EXECCOMMAND ( ship_clause | cache_caluse | input_clause | output_clause | error_clause )* )
-;
-
-ship_clause : ^( SHIP path_list? )
-;
-
-path_list : QUOTEDSTRING+
+cmd[String alias] returns[StreamingCommand command]
+@init {
+    List<String> shipPaths = new ArrayList<String>();
+    List<String> cachePaths = new ArrayList<String>();
+}
+ : ^( EXECCOMMAND ( ship_clause[shipPaths] | cache_caluse[cachePaths] | input_clause | output_clause | error_clause )* )
+   {
+       $command = builder.buildCommand( $EXECCOMMAND.text, shipPaths,
+           cachePaths, $input_clause.inputHandleSpecs, $output_clause.outputHandleSpecs,
+           $error_clause.dir == null? $alias : $error_clause.dir, $error_clause.limit );
+   }
 ;
 
-cache_caluse : ^( CACHE path_list )
+ship_clause[List<String> paths]
+ : ^( SHIP path_list[$paths]? )
 ;
 
-input_clause : ^( INPUT stream_cmd_list )
+path_list[List<String> paths]
+ : ( QUOTEDSTRING { $paths.add( builder.unquote( $QUOTEDSTRING.text ) ); } )+
 ;
 
-stream_cmd_list : stream_cmd+
+cache_caluse[List<String> paths]
+ : ^( CACHE path_list[$paths] )
 ;
 
-stream_cmd : ^( STDIN func_clause? )
-           | ^( STDOUT func_clause? )
-           | ^( QUOTEDSTRING func_clause? )
+input_clause returns[List<HandleSpec> inputHandleSpecs]
+@init {
+    $inputHandleSpecs = new ArrayList<HandleSpec>();
+}
+ : ^( INPUT ( stream_cmd { $inputHandleSpecs.add( $stream_cmd.handleSpec ); } )+ )
 ;
 
-output_clause : ^( OUTPUT stream_cmd_list )
+stream_cmd returns[HandleSpec handleSpec]
+@init {
+    String handleName = null;
+    FuncSpec fs = null;
+    String deserializer = PigStreaming.class.getName() + "()";
+    if( fs != null )
+        deserializer =  fs.toString();
+}
+@final {
+    $handleSpec = new HandleSpec( handleName, deserializer );
+}
+ : ^( STDIN { handleName = "stdin"; }
+      ( func_clause { fs = $func_clause.funcSpec; } )? )
+ | ^( STDOUT { handleName = "stdout"; }
+      ( func_clause { fs = $func_clause.funcSpec; } )? )
+ | ^( QUOTEDSTRING { handleName = builder.unquote( $QUOTEDSTRING.text ); }
+      ( func_clause { fs = $func_clause.funcSpec; } )? )
 ;
 
-error_clause : ^( ERROR error_cmd? )
+output_clause returns[List<HandleSpec> outputHandleSpecs]
+@init {
+    $outputHandleSpecs = new ArrayList<HandleSpec>();
+}
+ : ^( OUTPUT ( stream_cmd { $outputHandleSpecs.add( $stream_cmd.handleSpec ); } )+ )
 ;
 
-error_cmd : ^( QUOTEDSTRING INTEGER? )
+error_clause returns[String dir, Integer limit]
+@init {
+    $limit = StreamingCommand.MAX_TASKS;
+}
+ : ^( ERROR QUOTEDSTRING INTEGER? )
+   {
+       $dir = builder.unquote( $QUOTEDSTRING.text );
+       $limit = Integer.parseInt( $INTEGER.text );
+   }
 ;
 
 load_clause returns[String alias]
  : ^( LOAD filename func_clause? as_clause? )
   {
-      $alias = builder.buildLoadOp( $general_statement::alias,
-          $general_statement::parallel, $filename.filename, $func_clause.funcSpec, $as_clause.logicalSchema  );
+      $alias = builder.buildLoadOp( $statement::alias,
+          $statement::parallel, $filename.filename, $func_clause.funcSpec, $as_clause.logicalSchema  );
   }
 ;
 
@@ -257,7 +313,7 @@ func_clause returns[FuncSpec funcSpec]
    { 
        $funcSpec = builder.buildFuncSpec( $func_name.funcName, $func_args.args );
    }
- | ^( FUNC func_alias )
+ | ^( FUNC_REF func_alias )
    {
        $funcSpec = builder.lookupFunction( $func_alias.alias );
    }
@@ -286,33 +342,96 @@ func_args returns[List<String> args]
 ;
 
 group_clause returns[String alias]
- : ^( GROUP group_item_list QUOTEDSTRING? )
- | ^( COGROUP group_item_list QUOTEDSTRING? )
+scope {
+    MultiMap<Integer, LogicalExpressionPlan> groupPlans;
+    int inputIndex;
+    List<String> inputAliases;
+    List<Boolean> innerFlags;
+}
+@init {
+    $group_clause::groupPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+    $group_clause::inputAliases = new ArrayList<String>();
+    $group_clause::innerFlags = new ArrayList<Boolean>();
+    GROUPTYPE groupType = GROUPTYPE.REGULAR;
+}
+ : ^( GROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
+   {
+       $alias = builder.buildGroupOp( $statement::alias, $statement::parallel, 
+           $group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
+   }
+ | ^( COGROUP group_item+ ( group_type { groupType = $group_type.type; } )? )
+   {
+       $alias = builder.buildGroupOp( $statement::alias, $statement::parallel, 
+           $group_clause::inputAliases, $group_clause::groupPlans, groupType, $group_clause::innerFlags );
+   }
 ;
 
-group_item_list : group_item+
+group_type returns[GROUPTYPE type]
+ : HINT_COLLECTED { $type = GROUPTYPE.COLLECTED; }
+ | HINT_MERGE { $type = GROUPTYPE.MERGE; }
+ | HINT_REGULAR { $type = GROUPTYPE.REGULAR; }
 ;
 
-group_item : rel ( join_group_by_clause[$rel.name] | ALL | ANY ) ( INNER | OUTER )?
+group_item
+@init { boolean inner = false; }
+ : rel ( join_group_by_clause 
+         { 
+             $group_clause::groupPlans.put( $group_clause::inputIndex, $join_group_by_clause.plans );
+         }
+         | ALL 
+         {
+             LogicalExpressionPlan plan = new LogicalExpressionPlan();
+             new ConstantExpression( plan, "all", new LogicalFieldSchema( null , null, DataType.CHARARRAY ) );
+             List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>( 1 );
+             plans.add( plan );
+             $group_clause::groupPlans.put( $group_clause::inputIndex, plans );
+         }
+         | ANY
+         {
+             LogicalExpressionPlan plan = new LogicalExpressionPlan();
+             new UserFuncExpression( plan, new FuncSpec( GFAny.class.getName() ) );
+             List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>( 1 );
+             plans.add( plan );
+             $group_clause::groupPlans.put( $group_clause::inputIndex, plans );
+         }
+        ) ( INNER { inner =  true; } | OUTER )?
+   {
+       $group_clause::inputAliases.add( $statement::inputAlias );
+       $group_clause::innerFlags.add( inner );
+       $group_clause::inputIndex++;
+   }
 ;
 
-rel returns[String name] : alias { $name = $alias.name; }
-                        | op_clause { $name = $op_clause.alias; }
+rel
+ : alias
+   {
+       $statement::inputAlias = $alias.name;
+   }
+ | op_clause
+   {
+       $statement::inputAlias = $op_clause.alias;
+   }
 ;
 
-flatten_generated_item
- : ( flatten_clause | expr[null] | STAR ) as_clause?
+flatten_generated_item returns[LogicalExpressionPlan plan, boolean flattenFlag, LogicalSchema schema]
+@init {
+    $plan = new LogicalExpressionPlan();
+}
+ : flatten_clause[$plan] { $flattenFlag = true; }
+   ( as_clause  { $schema = $as_clause.logicalSchema; } )?
+ | ( expr[$plan] | STAR { builder.buildProjectStar( $plan, $statement::inputAlias ); } )
+   ( field { $schema =  new LogicalSchema(); $schema.addField( $field.fieldSchema ); } )?
 ;
 
-flatten_clause
- : ^( FLATTEN expr[null] )
+flatten_clause[LogicalExpressionPlan plan]
+ : ^( FLATTEN expr[$plan] )
 ;
 
 store_clause returns[String alias]
  : ^( STORE alias filename func_clause? )
    {
-       $alias= builder.buildStoreOp( $general_statement::alias,
-          $general_statement::parallel, $alias.name, $filename.filename, $func_clause.funcSpec );
+       $alias= builder.buildStoreOp( $statement::alias,
+          $statement::parallel, $alias.name, $filename.filename, $func_clause.funcSpec );
    }
 ;
 
@@ -320,8 +439,8 @@ filter_clause returns[String alias]
 @init { LogicalExpressionPlan exprPlan = new LogicalExpressionPlan(); }
  : ^( FILTER rel cond[exprPlan] )
    {
-       $alias = builder.buildFilterOp( $general_statement::alias,
-          $general_statement::parallel, $rel.name, exprPlan );
+       $alias = builder.buildFilterOp( $statement::alias,
+          $statement::parallel, $statement::inputAlias, exprPlan );
    }
 ;
 
@@ -373,19 +492,27 @@ cond[LogicalExpressionPlan exprPlan] ret
        $expr = new RegexExpression( $exprPlan, $e1.expr, $e2.expr );
    }
  | func_eval[$exprPlan]
+   {
+       $expr = $func_eval.expr;
+   }
 ;
 
 func_eval[LogicalExpressionPlan plan] returns[LogicalExpression expr]
-@init { List<LogicalExpression> args = new ArrayList<LogicalExpression>(); }
+@init { 
+    List<LogicalExpression> args = new ArrayList<LogicalExpression>();
+}
  : ^( FUNC_EVAL func_name ( real_arg[$plan] { args.add( $real_arg.expr ); } )* )
    {
-       //TODO: FuncSpec funcSpec = builder.buildFuncSpec( $func_name.funcName, $func_args.args );
+       $expr = builder.buildUDF( $plan, $func_name.funcName, args );
    }
 ;
 
 real_arg [LogicalExpressionPlan plan] returns[LogicalExpression expr]
  : e = expr[$plan] { $expr = $e.expr; }
  | STAR
+   {
+       $expr = builder.buildProjectStar( $plan, $statement::inputAlias );
+   }
 ;
 
 expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -433,19 +560,29 @@ expr[LogicalExpressionPlan plan] returns
 ;
 
 var_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : projectable_expr[$plan] ( dot_proj | pound_proj )*
+ : p = projectable_expr[$plan] ( dot_proj[$plan] | pound_proj[$plan] )*
 ;
 
 projectable_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
- : func_eval[$plan] { $expr = $func_eval.expr; }
- | col_ref[$plan] { }
- | bin_expr[$plan] { $expr = $bin_expr.expr; }
+ : func_eval[$plan]
+   {
+       $expr = $func_eval.expr;
+   }
+ | col_ref[$plan]
+   {
+   }
+ | bin_expr[$plan]
+   {
+       $expr = $bin_expr.expr;
+   }
 ;
 
-dot_proj : ^( PERIOD col_ref[null]+ )
+dot_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
+ : ^( PERIOD col_ref[$plan]+ )
 ;
 
-pound_proj : ^( POUND ( QUOTEDSTRING | NULL ) )
+pound_proj[LogicalExpressionPlan plan] returns[LogicalExpression expr]
+ : ^( POUND ( QUOTEDSTRING | NULL ) )
 ;
 
 bin_expr[LogicalExpressionPlan plan] returns[LogicalExpression expr]
@@ -458,43 +595,44 @@ bin_expr[LogicalExpressionPlan plan] ret
 limit_clause returns[String alias]
  : ^( LIMIT rel INTEGER  )
    {
-       $alias = builder.buildLimitOp( $general_statement::alias,
-           $general_statement::parallel, $rel.name, Long.valueOf( $INTEGER.text ) );
+       $alias = builder.buildLimitOp( $statement::alias,
+           $statement::parallel, $statement::inputAlias, Long.valueOf( $INTEGER.text ) );
    }
  | ^( LIMIT rel LONGINTEGER )
    {
-       $alias = builder.buildLimitOp( $general_statement::alias,
-           $general_statement::parallel, $rel.name, Long.valueOf( $LONGINTEGER.text ) );
+       $alias = builder.buildLimitOp( $statement::alias,
+           $statement::parallel, $statement::inputAlias, Long.valueOf( $LONGINTEGER.text ) );
    }
 ;
 
 sample_clause returns[String alias]
  : ^( SAMPLE rel DOUBLENUMBER )
    {
-       $alias = builder.buildSampleOp( $general_statement::alias,
-           $general_statement::parallel, $rel.name, Double.valueOf( $DOUBLENUMBER.text ) );
+       $alias = builder.buildSampleOp( $statement::alias,
+           $statement::parallel, $statement::inputAlias, Double.valueOf( $DOUBLENUMBER.text ) );
    }
 ;
 
 order_clause returns[String alias]
- : ^( ORDER rel order_by_clause[$rel.name] func_clause? )
+ : ^( ORDER rel order_by_clause func_clause? )
    {
-       $alias = builder.buildOrderOp( $general_statement::alias,
-           $general_statement::parallel, $rel.name, $order_by_clause.plans, 
+       $alias = builder.buildOrderOp( $statement::alias,
+           $statement::parallel, $statement::inputAlias, $order_by_clause.plans, 
            $order_by_clause.ascFlags, $func_clause.funcSpec );
    }
 ;
 
-order_by_clause[String opAlias] returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
+order_by_clause returns[List<LogicalExpressionPlan> plans, List<Boolean> ascFlags]
 @init {
     $plans = new ArrayList<LogicalExpressionPlan>();
     $ascFlags = new ArrayList<Boolean>();
 }
  : STAR {
-       LogicalExpressionPlan plan = builder.buildProjectStar( $opAlias );
+       LogicalExpressionPlan plan = new LogicalExpressionPlan();
+       builder.buildProjectStar( plan, $statement::inputAlias );
        $plans.add( plan );
    }
-   ( ASC | DESC { $ascFlags.add( false ); } )?
+   ( ASC { $ascFlags.add( true ); } | DESC { $ascFlags.add( false ); } )?
  | ( order_col
    {
        $plans.add( $order_col.plan );
@@ -513,8 +651,8 @@ order_col returns[LogicalExpressionPlan 
 distinct_clause returns[String alias]
  : ^( DISTINCT rel partition_clause? )
    {
-       $alias = builder.buildDistinctOp( $general_statement::alias,
-          $general_statement::parallel, $rel.name, $partition_clause.partitioner );
+       $alias = builder.buildDistinctOp( $statement::alias,
+          $statement::parallel, $statement::inputAlias, $partition_clause.partitioner );
    }
 ;
 
@@ -528,14 +666,14 @@ partition_clause returns[String partitio
 cross_clause returns[String alias]
  : ^( CROSS rel_list partition_clause? )
    {
-       $alias = builder.buildCrossOp( $general_statement::alias,
-          $general_statement::parallel, $rel_list.aliasList, $partition_clause.partitioner );
+       $alias = builder.buildCrossOp( $statement::alias,
+          $statement::parallel, $rel_list.aliasList, $partition_clause.partitioner );
    }
 ;
 
 rel_list returns[List<String> aliasList]
 @init { $aliasList = new ArrayList<String>(); }
- : ( rel { $aliasList.add( $rel.name ); } )+
+ : ( rel { $aliasList.add( $statement::inputAlias ); } )+
 ;
 
 join_clause returns[String alias]
@@ -552,17 +690,17 @@ scope {
 }
  : ^( JOIN join_sub_clause join_type? partition_clause? )
    {
-       $alias = builder.buildJoinOp( $general_statement::alias,
-          $general_statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
+       $alias = builder.buildJoinOp( $statement::alias,
+          $statement::parallel, $join_clause::inputAliases, $join_clause::joinPlans,
           $join_type.type, $join_clause::innerFlags, $partition_clause.partitioner );
    }
 ;
 
 join_type returns[JOINTYPE type]
- : JOIN_TYPE_REPL { $type = JOINTYPE.REPLICATED; }
- | JOIN_TYPE_MERGE { $type = JOINTYPE.MERGE; }
- | JOIN_TYPE_SKEWED { $type = JOINTYPE.SKEWED; }
- | JOIN_TYPE_DEFAULT { $type = JOINTYPE.HASH; }
+ : HINT_REPL { $type = JOINTYPE.REPLICATED; }
+ | HINT_MERGE { $type = JOINTYPE.MERGE; }
+ | HINT_SKEWED { $type = JOINTYPE.SKEWED; }
+ | HINT_DEFAULT { $type = JOINTYPE.HASH; }
 ;
 
 join_sub_clause
@@ -578,53 +716,80 @@ join_sub_clause
 ;
 
 join_item
- : ^( JOIN_ITEM rel join_group_by_clause[$rel.name] )
+ : ^( JOIN_ITEM rel join_group_by_clause )
    {
-       $join_clause::inputAliases.add( $rel.name );
+       $join_clause::inputAliases.add( $statement::inputAlias );
        $join_clause::joinPlans.put( $join_clause::inputIndex, $join_group_by_clause.plans );
        $join_clause::inputIndex++;
    }
 ;
 
-join_group_by_clause[String alias] returns[List<LogicalExpressionPlan> plans]
-scope { String inputAlias; }
+join_group_by_clause returns[List<LogicalExpressionPlan> plans]
 @init {
-    $join_group_by_clause::inputAlias = $alias;
     $plans = new ArrayList<LogicalExpressionPlan>();
 }
  : ^( BY ( join_group_by_expr { $plans.add( $join_group_by_expr.plan ); } )+ )
 ;
 
 join_group_by_expr returns[LogicalExpressionPlan plan]
- : { $plan = new LogicalExpressionPlan(); } expr[$plan]
+@init {
+    $plan = new LogicalExpressionPlan();
+}
+ : expr[$plan]
  | STAR 
    {
-       $plan = builder.buildProjectStar( $join_group_by_clause::inputAlias );
+       builder.buildProjectStar( $plan, $statement::inputAlias );
    }
 ;
 
 union_clause returns[String alias]
  : ^( UNION ONSCHEMA? rel_list )
    {
-      $alias = builder.buildUnionOp( $general_statement::alias,
-          $general_statement::parallel, $rel_list.aliasList );
+      // TODO: onschema support
+      $alias = builder.buildUnionOp( $statement::alias,
+          $statement::parallel, $rel_list.aliasList );
    }
 ;
 
-foreach_clause[String alias] returns[LogicalRelationalOperator op] : ^( FOREACH rel nested_plan )
-;
-
-nested_plan : ^( NESTED_PLAN foreach_blk )
-            | ^( NESTED_PLAN generate_clause )
+foreach_clause returns[String alias]
+ : ^( FOREACH rel foreach_plan )
+   {
+       $alias = builder.buildForeachOp( $statement::alias,
+          $statement::parallel, $statement::inputAlias, $foreach_plan::innerPlan );
+   }
 ;
 
-foreach_blk : nested_command_list generate_clause
+foreach_plan
+scope {
+    LogicalPlan innerPlan;
+}
+@init {
+   $foreach_plan::innerPlan = new LogicalPlan();
+}
+ : ^( FOREACH_PLAN nested_blk )
+ | ^( FOREACH_PLAN generate_clause parallel_clause? )
 ;
 
-generate_clause : ^( GENERATE flatten_generated_item+ )
+nested_blk : nested_command* generate_clause
 ;
 
-nested_command_list : nested_command*
+generate_clause
+@init {
+    List<LogicalExpressionPlan> plans = new ArrayList<LogicalExpressionPlan>();
+    List<Boolean> flattenFlags = new ArrayList<Boolean>();
+    List<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+}
+ : ^( GENERATE ( flatten_generated_item
+                 {
+                     plans.add( $flatten_generated_item.plan );
+                     flattenFlags.add( $flatten_generated_item.flattenFlag );
+                     schemas.add( $flatten_generated_item.schema );
+                 }
+               )+
+    )
+   {   
+       builder.buildGenerateOp( $foreach_plan::innerPlan, plans, flattenFlags, schemas );
+   }
 ;
 
 nested_command : ^( NESTED_CMD IDENTIFIER ( expr[null] | nested_op ) )
@@ -649,7 +814,7 @@ nested_alias_ref : IDENTIFIER
 nested_filter : ^( FILTER ( nested_alias_ref | nested_proj | expr[null] ) cond[null] )
 ;
 
-nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] )  order_by_clause[null] func_clause? )
+nested_sort : ^( ORDER ( nested_alias_ref | nested_proj | expr[null] )  order_by_clause func_clause? )
 ;
 
 nested_distinct : ^( DISTINCT ( nested_alias_ref | nested_proj | expr[null] ) )
@@ -658,25 +823,45 @@ nested_distinct : ^( DISTINCT ( nested_a
 nested_limit : ^( LIMIT ( nested_alias_ref | nested_proj | expr[null] ) INTEGER )
 ;
 
-stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )
+stream_clause returns[String alias]
+@init {
+    StreamingCommand cmd = null;
+}
+ : ^( STREAM rel ( EXECCOMMAND { cmd = builder.buildCommand( $EXECCOMMAND.text ); } 
+                 | IDENTIFIER { cmd = builder.lookupCommand( $IDENTIFIER.text ); } ) as_clause? )
+   {
+       $alias = builder.buildStreamOp( $statement::alias, $statement::parallel,
+          $statement::inputAlias, cmd, $as_clause.logicalSchema );
+   }
 ;
 
-mr_clause : ^( MAPREDUCE QUOTEDSTRING path_list? store_clause load_clause EXECCOMMAND? )
+mr_clause returns[String alias]
+@init {
+    List<String> paths = new ArrayList<String>();
+    String alias = $statement::alias;
+}
+ : ^( MAPREDUCE QUOTEDSTRING path_list[paths]? 
+     { $statement::alias = null; } store_clause 
+     { $statement::alias = alias; } load_clause
+     EXECCOMMAND? )
+   {
+       $alias = builder.buildNativeOp( $statement::parallel,
+           builder.unquote( $QUOTEDSTRING.text ), builder.unquote( $EXECCOMMAND.text ), 
+           paths, $store_clause.alias, $load_clause.alias );
+   }
 ;
 
-split_clause returns[String alias]
- : ^( SPLIT rel { $alias = builder.buildSplitOp( $general_statement::alias, 
-                  $general_statement::parallel, $rel.name ); } 
-      split_branch[$alias]+ )
-   
+split_clause
+ : ^( SPLIT rel { $statement::inputAlias = builder.buildSplitOp( $statement::inputAlias ); } 
+      split_branch+ )
 ;
 
-split_branch[String inputAlias] returns[String alias]
+split_branch
 @init { LogicalExpressionPlan splitPlan = new LogicalExpressionPlan(); }
  : ^( SPLIT_BRANCH IDENTIFIER cond[splitPlan] )
    {
-       $alias = builder.buildSplitOutputOp( $IDENTIFIER.text,
-          $general_statement::parallel, $inputAlias, splitPlan );
+       builder.buildSplitOutputOp( $IDENTIFIER.text,
+           $statement::parallel, $statement::inputAlias, splitPlan );
    }
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Tue Jan  4 22:33:16 2011
@@ -266,16 +266,22 @@ DOUBLENUMBER : FLOATINGPOINT ( 'E' ( MIN
 FLOATNUMBER : DOUBLENUMBER ( 'F' )?
 ;
 
-JOIN_TYPE_REPL : '\'REPL\'' |  '\'REPLICATED\''
+HINT_REPL : '\'REPL\'' |  '\'REPLICATED\''
 ;
 
-JOIN_TYPE_SKEWED : '\'SKEWED\''
+HINT_SKEWED : '\'SKEWED\''
 ;
 
-JOIN_TYPE_MERGE : '\'MERGE\''
+HINT_MERGE : '\'MERGE\''
 ;
 
-JOIN_TYPE_DEFAULT : '\'HASH\'' | '\'DEFAULT\''
+HINT_DEFAULT : '\'HASH\'' | '\'DEFAULT\''
+;
+
+HINT_COLLECTED : '\'COLLECTED\''
+;
+
+HINT_REGULAR : '\'REGULAR\''
 ;
 
 QUOTEDSTRING :  '\'' (   ( ~ ( '\'' | '\\' | '\n' | '\r' ) )

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Tue Jan  4 22:33:16 2011
@@ -36,6 +36,7 @@ tokens {
     QUERY;
     STATEMENT;
     FUNC;
+    FUNC_REF;
     FUNC_EVAL;
     CAST_EXPR;
     BIN_EXPR;
@@ -48,7 +49,7 @@ tokens {
     NESTED_CMD;
     NESTED_PROJ;
     SPLIT_BRANCH;
-    NESTED_PLAN;
+    FOREACH_PLAN;
     MAP_TYPE;
     TUPLE_TYPE;
     BAG_TYPE;
@@ -112,11 +113,19 @@ query : statement* 
      -> ^( QUERY statement* )
 ;
 
-statement : general_statement | foreach_statement
+statement : general_statement
+          | foreach_statement
+          | split_statement          
 ;
 
-general_statement : ( alias EQUAL )? op_clause ( PARALLEL INTEGER )? SEMI_COLON 
-                 -> ^( STATEMENT alias? op_clause INTEGER? )
+split_statement : split_clause SEMI_COLON!
+;
+
+general_statement : ( alias EQUAL )? op_clause parallel_clause? SEMI_COLON 
+                 -> ^( STATEMENT alias? op_clause parallel_clause? )
+;
+
+parallel_clause : PARALLEL^ INTEGER
 ;
 
 // We need to handle foreach specifically because of the ending ';', which is not required 
@@ -142,7 +151,6 @@ op_clause : define_clause 
           | union_clause
           | stream_clause
           | mr_clause
-          | split_clause
 ;
 
 define_clause : DEFINE^ alias ( cmd | func_clause )
@@ -174,10 +182,7 @@ stream_cmd : ( STDIN | STDOUT | QUOTEDST
 output_clause : OUTPUT^ LEFT_PAREN! stream_cmd_list RIGHT_PAREN!
 ;
 
-error_clause : ERROR^ LEFT_PAREN! error_cmd? RIGHT_PAREN!
-;
-
-error_cmd : QUOTEDSTRING^ ( LIMIT! INTEGER )?
+error_clause : ERROR^ LEFT_PAREN! QUOTEDSTRING ( LIMIT! INTEGER )? RIGHT_PAREN!
 ;
 
 load_clause : LOAD^ filename ( USING! func_clause )? as_clause?
@@ -225,7 +230,7 @@ map_type : MAP LEFT_BRACKET RIGHT_BRACKE
 func_clause : func_name LEFT_PAREN func_args? RIGHT_PAREN
            -> ^( FUNC func_name func_args? )
             | func_alias
-           -> ^( FUNC func_alias )
+           -> ^( FUNC_REF func_alias )
 ;
 
 func_name : eid ( ( PERIOD | DOLLAR ) eid )*
@@ -238,7 +243,10 @@ func_args : QUOTEDSTRING ( COMMA QUOTEDS
          -> QUOTEDSTRING+
 ;
 
-group_clause : ( GROUP | COGROUP )^ group_item_list ( USING! QUOTEDSTRING )?
+group_clause : ( GROUP | COGROUP )^ group_item_list ( USING! group_type )?
+;
+
+group_type : HINT_COLLECTED | HINT_MERGE | HINT_REGULAR
 ;
 
 group_item_list : group_item ( COMMA group_item )*
@@ -251,7 +259,8 @@ group_item : rel ( join_group_by_clause 
 rel : alias | LEFT_PAREN! op_clause RIGHT_PAREN!
 ;
 
-flatten_generated_item : ( flatten_clause | expr | STAR ) as_clause?
+flatten_generated_item : flatten_clause as_clause?
+                       | ( expr | STAR ) ( AS! field )?
 ;
 
 flatten_clause : FLATTEN^ LEFT_PAREN! expr RIGHT_PAREN!
@@ -377,7 +386,7 @@ rel_list : rel ( COMMA rel )*
 join_clause : JOIN^ join_sub_clause ( USING! join_type )? partition_clause?
 ;
 
-join_type : JOIN_TYPE_REPL | JOIN_TYPE_MERGE | JOIN_TYPE_SKEWED | JOIN_TYPE_DEFAULT
+join_type : HINT_REPL | HINT_MERGE | HINT_SKEWED | HINT_DEFAULT
 ;
 
 join_sub_clause : join_item ( LEFT | RIGHT | FULL ) OUTER? join_item
@@ -405,16 +414,16 @@ join_group_by_expr : expr | STAR
 union_clause : UNION^ ONSCHEMA? rel_list
 ;
 
-foreach_clause : FOREACH^ rel nested_plan
+foreach_clause : FOREACH^ rel foreach_plan
 ;
 
-nested_plan : foreach_blk SEMI_COLON?
-           -> ^( NESTED_PLAN foreach_blk )
-            | ( generate_clause SEMI_COLON )
-           -> ^( NESTED_PLAN generate_clause )
+foreach_plan : nested_blk SEMI_COLON?
+           -> ^( FOREACH_PLAN nested_blk )
+            | ( generate_clause parallel_clause? SEMI_COLON )
+           -> ^( FOREACH_PLAN generate_clause parallel_clause? )
 ;
 
-foreach_blk : LEFT_CURLY! nested_command_list ( generate_clause SEMI_COLON! ) RIGHT_CURLY!
+nested_blk : LEFT_CURLY! nested_command_list ( generate_clause SEMI_COLON! ) RIGHT_CURLY!
 ;
 
 generate_clause : GENERATE flatten_generated_item ( COMMA flatten_generated_item )*

Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Tue Jan  4 22:33:16 2011
@@ -18,45 +18,69 @@
 
 package org.apache.pig.parser;
 
-import java.io.IOException;
 
 import junit.framework.Assert;
 
-import org.antlr.runtime.RecognitionException;
 import org.junit.Test;
 
 public class TestLogicalPlanGenerator {
     @Test
-    public void test1() throws RecognitionException, IOException, ParsingFailureException {
-        try {
-        ParserTestingUtils.generateLogicalPlan( 
-        		"A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " + 
-        		"B = limit A 100; " +
-        		"C = filter B by 2 > 1; " +
-        		"D = load 'y' as (d1, d2); " +
-        		"E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
-        		"F = store E into 'output';" );
-        } catch(Exception ex) {
-            Assert.assertTrue( false );// should never come here.
-        }
+    public void test1() {
+        String query = "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " + 
+                       "B = limit A 100; " +
+                       "C = filter B by 2 > 1; " +
+                       "D = load 'y' as (d1, d2); " +
+                       "E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
+                       "F = store E into 'output';";
+        generateLogicalPlan( query );
     }
 
     @Test
-    public void test2() throws RecognitionException, IOException, ParsingFailureException {
+    public void test2() {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
+                       "B = distinct A partition by org.apache.pig.Identity; " +
+                       "C = sample B 0.49; " +
+                       "D = order C by $0, $1; " +
+                       "E = load 'y' as (d1, d2); " +
+                       "F = union onschema D, E; " +
+                       "G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
+                       "H = cross F, G; " +
+                       "split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
+                       "L = store J into 'output';";
+        generateLogicalPlan( query );
+    }
+
+    @Test
+    public void test3() {
+    }
+    
+    private void generateLogicalPlan(String query) {
         try {
-        ParserTestingUtils.generateLogicalPlan( 
-        		"A = load 'x' as ( u:int, v:long, w:bytearray); " + 
-        		"B = distinct A partition by org.apache.pig.Identity; " +
-        		"C = sample B 0.49; " +
-        		"D = order C by $0, $1; " +
-        		"E = load 'y' as (d1, d2); " +
-        		"F = union onschema D, E; " +
-        		"G = load 'z' as (g1:int, g2:tuple(g21, g22)); " +
-        		"H = cross F, G; " +
-        		"I = split H into I if 10 > 5, J if 'world' eq 'hello', K if 77 <= 200; " +
-        		"L = store J into 'output';" );
+            ParserTestingUtils.generateLogicalPlan( query );
         } catch(Exception ex) {
-            Assert.assertTrue( false );// should never come here.
+            Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
         }
     }
+
+    @Test
+    public void test4() {
+        String query = "A = load 'x'; " + 
+                       "B = mapreduce '" + "myjar.jar" + "' " +
+                           "Store A into 'table_testNativeMRJobSimple_input' "+
+                           "Load 'table_testNativeMRJobSimple_output' "+
+                           "`org.apache.pig.test.utils.WordCount -files " + "file " +
+                           "table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output " +
+                           "stopworld.file" + "`;" +
+                        "C = Store B into 'output';";
+        generateLogicalPlan( query );
+    }
+
+    // Test define function.
+    @Test
+    public void test5() {
+        String query = "define myudf org.apache.pig.TextLoader( 'test', 'data' );" +
+                       "A = load 'x' using myudf;" +
+                       "store A into 'y';";
+        generateLogicalPlan( query );
+    }
 }

Modified: pig/trunk/test/org/apache/pig/parser/TestParser.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestParser.pig?rev=1055216&r1=1055215&r2=1055216&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestParser.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestParser.pig Tue Jan  4 22:33:16 2011
@@ -77,7 +77,7 @@ store countInactiveAcct into '/user/kale
 store inactiveAccounts into '/user/kaleidoscope/pow_stats/20080228/acct/InactiveAcct';
 
 --split
-G = Split A into X if $0 > 0, Y if $0 == 0;
+Split A into X if $0 > 0, Y if $0 == 0;
 
 --union
 H = union onschema A, B;